This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 422d422b93c655c58f84a83f719f7bbfb94bfb83 Author: Jark Wu <[email protected]> AuthorDate: Thu Feb 12 00:15:55 2026 +0800 [kv] Support COUNT(*) pushdown for primary key tables and introduce GetTableStats RPC (#2651) --- .../java/org/apache/fluss/client/admin/Admin.java | 8 + .../org/apache/fluss/client/admin/FlussAdmin.java | 110 +++++++++++++ .../fluss/client/utils/ClientRpcMessageUtils.java | 27 ++++ .../fluss/client/admin/FlussAdminITCase.java | 173 +++++++++++++++++++++ .../java/org/apache/fluss/metadata/TableStats.java | 63 ++++++++ .../fluss/flink/source/FlinkTableSource.java | 3 +- .../apache/fluss/flink/utils/PushdownUtils.java | 78 +++++++--- .../flink/sink/testutils/TestAdminAdapter.java | 6 + .../flink/source/FlinkTableSourceBatchITCase.java | 55 ++++++- .../rpc/entity/TableStatsResultForBucket.java | 44 ++++++ .../fluss/rpc/gateway/TabletServerGateway.java | 10 ++ .../fluss/rpc/netty/client/ServerConnection.java | 1 + .../org/apache/fluss/rpc/protocol/ApiKeys.java | 3 +- fluss-rpc/src/main/proto/FlussApi.proto | 46 ++++++ .../fluss/rpc/TestingTabletGatewayService.java | 7 + .../apache/fluss/server/kv/KvRecoverHelper.java | 20 ++- .../java/org/apache/fluss/server/kv/KvTablet.java | 33 +++- .../fluss/server/kv/prewrite/KvPreWriteBuffer.java | 65 ++++++-- .../org/apache/fluss/server/log/LogTablet.java | 4 + .../org/apache/fluss/server/replica/Replica.java | 15 ++ .../fluss/server/replica/ReplicaManager.java | 18 +++ .../apache/fluss/server/tablet/TabletService.java | 15 ++ .../fluss/server/utils/ServerRpcMessageUtils.java | 44 ++++++ .../org/apache/fluss/server/kv/KvTabletTest.java | 143 +++++++++++++++++ .../server/kv/prewrite/KvPreWriteBufferTest.java | 77 +++++++-- .../server/replica/KvReplicaRestoreITCase.java | 64 +++++++- .../server/tablet/TestTabletServerGateway.java | 7 + 27 files changed, 1063 insertions(+), 76 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index 04e7cb214..d8fe57660 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -65,6 +65,7 @@ import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metadata.TableStats; import org.apache.fluss.security.acl.AclBinding; import org.apache.fluss.security.acl.AclBindingFilter; @@ -533,6 +534,13 @@ public interface Admin extends AutoCloseable { Collection<Integer> buckets, OffsetSpec offsetSpec); + /** + * Asynchronously gets the statistics of this table. + * + * @return A future TableStats + */ + CompletableFuture<TableStats> getTableStats(TablePath tablePath); + /** * Retrieves ACL entries filtered by principal for the specified resource. * diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index 76662dc22..8f39a6e42 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -30,6 +30,7 @@ import org.apache.fluss.cluster.rebalance.RebalanceProgress; import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.cluster.AlterConfig; import org.apache.fluss.config.cluster.ConfigEntry; +import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.LeaderNotAvailableException; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.DatabaseInfo; @@ -44,6 +45,7 @@ import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metadata.TableStats; import org.apache.fluss.rpc.GatewayClientProxy; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.gateway.AdminGateway; @@ -70,6 +72,8 @@ import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsRequest; import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest; import org.apache.fluss.rpc.messages.GetTableInfoRequest; import org.apache.fluss.rpc.messages.GetTableSchemaRequest; +import org.apache.fluss.rpc.messages.GetTableStatsRequest; +import org.apache.fluss.rpc.messages.GetTableStatsResponse; import org.apache.fluss.rpc.messages.ListAclsRequest; import org.apache.fluss.rpc.messages.ListDatabasesRequest; import org.apache.fluss.rpc.messages.ListDatabasesResponse; @@ -82,6 +86,7 @@ import org.apache.fluss.rpc.messages.PbAlterConfig; import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket; import org.apache.fluss.rpc.messages.PbPartitionSpec; import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.messages.PbTableStatsRespForBucket; import org.apache.fluss.rpc.messages.RebalanceRequest; import org.apache.fluss.rpc.messages.RebalanceResponse; import org.apache.fluss.rpc.messages.RemoveServerTagRequest; @@ -91,6 +96,7 @@ import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.security.acl.AclBinding; import org.apache.fluss.security.acl.AclBindingFilter; import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.concurrent.FutureUtils; import javax.annotation.Nullable; @@ -106,6 +112,7 @@ import java.util.concurrent.CompletableFuture; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest; +import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeGetTableStatsRequest; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeRegisterProducerOffsetsRequest; @@ -457,6 +464,47 @@ public class FlussAdmin implements Admin { return listOffsets(PhysicalTablePath.of(tablePath, partitionName), buckets, offsetSpec); } + @Override + public CompletableFuture<TableStats> getTableStats(TablePath tablePath) { + metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); + TableInfo tableInfo = getTableInfo(tablePath).join(); + try { + int bucketCount = tableInfo.getNumBuckets(); + List<PartitionInfo> partitionInfos; + if (tableInfo.isPartitioned()) { + partitionInfos = listPartitionInfos(tablePath).get(); + } else { + partitionInfos = Collections.singletonList(null); + } + // create all TableBuckets for each partition and bucket combination + Map<TableBucket, CompletableFuture<Long>> bucketToRowCountMap = new HashMap<>(); + for (PartitionInfo partitionInfo : partitionInfos) { + for (int bucket = 0; bucket < bucketCount; bucket++) { + TableBucket tb = + new TableBucket( + tableInfo.getTableId(), + partitionInfo == null ? null : partitionInfo.getPartitionId(), + bucket); + bucketToRowCountMap.put(tb, new CompletableFuture<>()); + } + } + Map<Integer, GetTableStatsRequest> requestMap = + prepareTableStatsRequests( + metadataUpdater, bucketToRowCountMap.keySet(), tablePath); + sendTableStatsRequest( + metadataUpdater, tableInfo.getTableId(), requestMap, bucketToRowCountMap); + return FutureUtils.combineAll(bucketToRowCountMap.values()) + .thenApply( + counts -> { + long totalRowCount = counts.stream().reduce(0L, Long::sum); + return new TableStats(totalRowCount); + }); + } catch (Exception e) { + throw new FlussRuntimeException( + String.format("Failed to get row count for the table '%s'.", tablePath), e); + } + } + private ListOffsetsResult listOffsets( PhysicalTablePath physicalTablePath, Collection<Integer> buckets, @@ -678,6 +726,68 @@ public class FlussAdmin implements Admin { // nothing to do yet } + private static Map<Integer, GetTableStatsRequest> prepareTableStatsRequests( + MetadataUpdater metadataUpdater, Collection<TableBucket> buckets, TablePath tablePath) { + Map<Integer, List<TableBucket>> nodeForBucketList = new HashMap<>(); + for (TableBucket tb : buckets) { + int leader = metadataUpdater.leaderFor(tablePath, tb); + nodeForBucketList.computeIfAbsent(leader, k -> new ArrayList<>()).add(tb); + } + + Map<Integer, GetTableStatsRequest> requests = new HashMap<>(); + nodeForBucketList.forEach( + (leader, tbs) -> requests.put(leader, makeGetTableStatsRequest(tbs))); + return requests; + } + + private static void sendTableStatsRequest( + MetadataUpdater metadataUpdater, + long tableId, + Map<Integer, GetTableStatsRequest> leaderToRequestMap, + Map<TableBucket, CompletableFuture<Long>> bucketToRowCountMap) { + leaderToRequestMap.forEach( + (leader, request) -> { + TabletServerGateway gateway = + metadataUpdater.newTabletServerClientForNode(leader); + if (gateway == null) { + throw new LeaderNotAvailableException( + "Server " + leader + " is not found in metadata cache."); + } else { + gateway.getTableStats(request) + .whenComplete( + (response, t) -> + handleTableStatsResponse( + response, t, tableId, bucketToRowCountMap)); + } + }); + } + + private static void handleTableStatsResponse( + GetTableStatsResponse response, + Throwable t, + long tableId, + Map<TableBucket, CompletableFuture<Long>> bucketToRowCountMap) { + if (t != null) { + // fail all futures to fail fast + bucketToRowCountMap.values().forEach(f -> f.completeExceptionally(t)); + return; + } + for (PbTableStatsRespForBucket resp : response.getBucketsRespsList()) { + TableBucket tb = + new TableBucket( + tableId, + resp.hasPartitionId() ? resp.getPartitionId() : null, + resp.getBucketId()); + if (resp.hasErrorCode()) { + bucketToRowCountMap + .get(tb) + .completeExceptionally(ApiError.fromErrorMessage(resp).exception()); + } else { + bucketToRowCountMap.get(tb).complete(resp.getRowCount()); + } + } + } + private static Map<Integer, ListOffsetsRequest> prepareListOffsetsRequests( MetadataUpdater metadataUpdater, long tableId, diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index 9da29c0ea..a7dbec47c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -54,6 +54,7 @@ import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse; import org.apache.fluss.rpc.messages.GetLakeSnapshotResponse; import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse; import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse; +import org.apache.fluss.rpc.messages.GetTableStatsRequest; import org.apache.fluss.rpc.messages.ListDatabasesResponse; import org.apache.fluss.rpc.messages.ListOffsetsRequest; import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; @@ -84,6 +85,7 @@ import org.apache.fluss.rpc.messages.PbRebalanceProgressForTable; import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile; import org.apache.fluss.rpc.messages.PbRenameColumn; import org.apache.fluss.rpc.messages.PbTableBucket; +import org.apache.fluss.rpc.messages.PbTableStatsReqForBucket; import org.apache.fluss.rpc.messages.PrefixLookupRequest; import org.apache.fluss.rpc.messages.ProduceLogRequest; import org.apache.fluss.rpc.messages.PutKvRequest; @@ -755,4 +757,29 @@ public class ClientRpcMessageUtils { return databaseSummaries; } + + public static GetTableStatsRequest makeGetTableStatsRequest(List<TableBucket> buckets) { + if (buckets.isEmpty()) { + throw new IllegalArgumentException("Buckets list cannot be empty"); + } + long tableId = buckets.get(0).getTableId(); + List<PbTableStatsReqForBucket> pbBuckets = + buckets.stream() + .map( + bucket -> { + if (bucket.getTableId() != tableId) { + throw new IllegalArgumentException( + "All buckets should belong to the same table"); + } + PbTableStatsReqForBucket pbBucket = + new PbTableStatsReqForBucket() + .setBucketId(bucket.getBucket()); + if (bucket.getPartitionId() != null) { + pbBucket.setPartitionId(bucket.getPartitionId()); + } + return pbBucket; + }) + .collect(Collectors.toList()); + return new GetTableStatsRequest().setTableId(tableId).addAllBucketsReqs(pbBuckets); + } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index db1c0f01c..d2522d1eb 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.ConnectionFactory; import org.apache.fluss.client.metadata.KvSnapshotMetadata; import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.rebalance.ServerTag; @@ -1843,4 +1844,176 @@ class FlussAdminITCase extends ClientToServerITCaseBase { admin.createTable(tablePath6, tableDescriptor6, false).get(); assertThat(admin.tableExists(tablePath6).get()).isTrue(); } + + // ==================== Table Statistics Tests ==================== + + @Test + void testGetTableStatsForPrimaryKeyTableWithFailover() throws Exception { + // Create a primary key table + TablePath tablePath = TablePath.of("test_db", "test_pk_table_stats"); + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(DEFAULT_SCHEMA).distributedBy(3, "id").build(); + long tableId = createTable(tablePath, tableDescriptor, true); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + + // Initially, row count should be 0 + assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(0); + + // Insert some data + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + for (int i = 0; i < 100; i++) { + upsertWriter.upsert(row(i, "name" + i, i % 50)); + } + upsertWriter.flush(); + } + assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(100); + + // Delete some rows, update some rows, insert some rows + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + for (int i = 0; i < 30; i++) { + upsertWriter.delete(row(i, null, null)); + } + upsertWriter.flush(); + } + assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(70); + + // snapshot the row count + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath); + // restart all the servers + for (int i = 0; i < FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().size(); i++) { + FLUSS_CLUSTER_EXTENSION.stopTabletServer(i); + FLUSS_CLUSTER_EXTENSION.startTabletServer(i); + } + + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + + // reconnect with the restarted cluster + Connection newConn = ConnectionFactory.createConnection(clientConf); + try (Table table = newConn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + // update existed rows shouldn't increase row count + for (int i = 50; i < 90; i++) { + upsertWriter.upsert(row(i, "updated_name" + i, i % 50)); + } + for (int i = 200; i < 300; i++) { + upsertWriter.upsert(row(i, "name" + i, i % 50)); + } + upsertWriter.flush(); + } + // Final row count should be 170 + assertThat(newConn.getAdmin().getTableStats(tablePath).get().getRowCount()).isEqualTo(170); + newConn.close(); + } + + @Test + void testGetTableStatsForLogTable() throws Exception { + // Create a log table (no primary key) + Schema logTableSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("age", DataTypes.INT()) + .build(); + TablePath tablePath = TablePath.of("test_db", "test_log_table_stats"); + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(logTableSchema).distributedBy(3).build(); + long tableId = createTable(tablePath, tableDescriptor, true); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + + // Initially, row count should be 0 + assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(0); + + // Append some data + try (Table table = conn.getTable(tablePath)) { + AppendWriter appendWriter = table.newAppend().createWriter(); + for (int i = 0; i < 50; i++) { + appendWriter.append(row(i, "name" + i, i % 30)); + } + appendWriter.flush(); + } + + assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(50); + } + + @Test + void testGetTableStatsForPartitionedTable() throws Exception { + // Create a partitioned primary key table + Schema partitionedSchema = + Schema.newBuilder() + .primaryKey("id", "dt") + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("dt", DataTypes.STRING()) + .build(); + TablePath tablePath = TablePath.of("test_db", "test_partitioned_table_stats"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(partitionedSchema) + .distributedBy(2, "id") + .partitionedBy("dt") + .build(); + long tableId = createTable(tablePath, tableDescriptor, true); + + // Create partitions + admin.createPartition(tablePath, newPartitionSpec("dt", "2024-01-01"), false).get(); + admin.createPartition(tablePath, newPartitionSpec("dt", "2024-01-02"), false).get(); + admin.listPartitionInfos(tablePath) + .join() + .forEach( + partition -> { + FLUSS_CLUSTER_EXTENSION.waitUntilTablePartitionReady( + tableId, partition.getPartitionId()); + }); + + // Initially, row count should be 0 + assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(0); + + // Insert data into different partitions + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + // 30 rows in partition 2024-01-01 + for (int i = 0; i < 30; i++) { + upsertWriter.upsert(row(i, "name" + i, "2024-01-01")); + } + // 20 rows in partition 2024-01-02 + for (int i = 0; i < 20; i++) { + upsertWriter.upsert(row(i, "name" + i, "2024-01-02")); + } + upsertWriter.flush(); + } + // Total row count should be 50 + assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(50); + } + + @Test + void testGetTableStatsForWalChangelogModeTable() throws Exception { + // Create a primary key table with WAL changelog mode (row count disabled) + TablePath tablePath = TablePath.of("test_db", "test_wal_changelog_table_stats"); + Map<String, String> properties = new HashMap<>(); + properties.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .distributedBy(3, "id") + .properties(properties) + .build(); + createTable(tablePath, tableDescriptor, true); + + // Insert some data + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + for (int i = 0; i < 10; i++) { + upsertWriter.upsert(row(i, "name" + i, i % 5)); + } + upsertWriter.flush(); + } + + // Getting table stats should throw exception for WAL changelog mode tables + assertThatThrownBy(() -> admin.getTableStats(tablePath).get()) + .cause() + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining("Row count is disabled for this table"); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableStats.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableStats.java new file mode 100644 index 000000000..4f4022b55 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableStats.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.metadata; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.Objects; + +/** + * Statistics of a table. + * + * @since 0.9 + */ +@PublicEvolving +public class TableStats { + + private final long rowCount; + + public TableStats(long rowCount) { + this.rowCount = rowCount; + } + + /** Returns the current total row count of the table. */ + public long getRowCount() { + return rowCount; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + TableStats that = (TableStats) o; + return rowCount == that.rowCount; + } + + @Override + public int hashCode() { + return Objects.hashCode(rowCount); + } + + @Override + public String toString() { + return "TableStats{" + "rowCount=" + rowCount + '}'; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 4af25571f..83b4dd29e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -284,7 +284,7 @@ public class FlinkTableSource results = Collections.singleton( GenericRowData.of( - PushdownUtils.countLogTable(tablePath, flussConfig))); + PushdownUtils.countTable(tablePath, flussConfig))); } TypeInformation<RowData> resultTypeInfo = @@ -593,7 +593,6 @@ public class FlinkTableSource // Only supports 'select count(*)/count(1) from source' for log table now. if (streaming || aggregateExpressions.size() != 1 - || hasPrimaryKey() || groupingSets.size() > 1 || (groupingSets.size() == 1 && groupingSets.get(0).length > 0) // The count pushdown feature is not supported when the data lake is enabled. diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java index 16cf55a41..370782607 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java @@ -22,8 +22,6 @@ import org.apache.fluss.client.ConnectionFactory; import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.admin.ListOffsetsResult; import org.apache.fluss.client.admin.OffsetSpec; -import org.apache.fluss.client.admin.OffsetSpec.EarliestSpec; -import org.apache.fluss.client.admin.OffsetSpec.LatestSpec; import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.scanner.Scan; import org.apache.fluss.client.table.scanner.batch.BatchScanUtils; @@ -31,12 +29,14 @@ import org.apache.fluss.client.table.scanner.batch.BatchScanner; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.UnsupportedVersionException; import org.apache.fluss.flink.source.lookup.FlinkLookupFunction; import org.apache.fluss.flink.source.lookup.LookupNormalizer; import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metadata.TableStats; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; import org.apache.fluss.row.GenericRow; @@ -76,6 +76,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.fluss.flink.source.lookup.LookupNormalizer.createPrimaryKeyLookupNormalizer; +import static org.apache.fluss.utils.ExceptionUtils.findThrowable; /** Utilities for pushdown abilities. */ public class PushdownUtils { @@ -365,34 +366,53 @@ public class PushdownUtils { } } - public static long countLogTable(TablePath tablePath, Configuration flussConfig) { + public static long countTable(TablePath tablePath, Configuration flussConfig) { try (Connection connection = ConnectionFactory.createConnection(flussConfig); Admin flussAdmin = connection.getAdmin()) { - TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get(); - int bucketCount = tableInfo.getNumBuckets(); - Collection<Integer> buckets = - IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()); - List<PartitionInfo> partitionInfos; - if (tableInfo.isPartitioned()) { - partitionInfos = flussAdmin.listPartitionInfos(tablePath).get(); - } else { - partitionInfos = Collections.singletonList(null); - } - - List<CompletableFuture<Long>> countFutureList = - offsetLengthes(flussAdmin, tablePath, partitionInfos, buckets); - // wait for all the response - CompletableFuture.allOf(countFutureList.toArray(new CompletableFuture[0])).join(); - long count = 0; - for (CompletableFuture<Long> countFuture : countFutureList) { - count += countFuture.get(); + try { + TableStats tableStats = flussAdmin.getTableStats(tablePath).get(); + return tableStats.getRowCount(); + } catch (Exception e) { + if (findThrowable(e, UnsupportedVersionException.class).isPresent()) { + // if the server doesn't support getTableStats, we fallback to countLogTable, + // which is less efficient. + return countLogTable(flussAdmin, tablePath); + } else { + throw e; + } } - return count; } catch (Exception e) { throw new FlussRuntimeException(e); } } + /** + * We keep this method for back compatibility for old clusters (< 0.9), but it's not efficient. + * We should use countTable instead in the future. + */ + private static long countLogTable(Admin flussAdmin, TablePath tablePath) throws Exception { + TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get(); + int bucketCount = tableInfo.getNumBuckets(); + Collection<Integer> buckets = + IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()); + List<PartitionInfo> partitionInfos; + if (tableInfo.isPartitioned()) { + partitionInfos = flussAdmin.listPartitionInfos(tablePath).get(); + } else { + partitionInfos = Collections.singletonList(null); + } + + List<CompletableFuture<Long>> countFutureList = + offsetLengthes(flussAdmin, tablePath, partitionInfos, buckets); + // wait for all the response + CompletableFuture.allOf(countFutureList.toArray(new CompletableFuture[0])).join(); + long count = 0; + for (CompletableFuture<Long> countFuture : countFutureList) { + count += countFuture.get(); + } + return count; + } + private static List<CompletableFuture<Long>> offsetLengthes( Admin flussAdmin, TablePath tablePath, @@ -402,9 +422,19 @@ public class PushdownUtils { for (@Nullable PartitionInfo info : partitionInfos) { String partitionName = info != null ? info.getPartitionName() : null; ListOffsetsResult earliestOffsets = - listOffsets(flussAdmin, tablePath, buckets, new EarliestSpec(), partitionName); + listOffsets( + flussAdmin, + tablePath, + buckets, + new OffsetSpec.EarliestSpec(), + partitionName); ListOffsetsResult latestOffsets = - listOffsets(flussAdmin, tablePath, buckets, new LatestSpec(), partitionName); + listOffsets( + flussAdmin, + tablePath, + buckets, + new OffsetSpec.LatestSpec(), + partitionName); CompletableFuture<Long> apply = earliestOffsets .all() diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java index ee404eeb4..c8dbc0dc0 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java @@ -45,6 +45,7 @@ import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metadata.TableStats; import org.apache.fluss.security.acl.AclBinding; import org.apache.fluss.security.acl.AclBindingFilter; @@ -222,6 +223,11 @@ public class TestAdminAdapter implements Admin { throw new UnsupportedOperationException("Not implemented in TestAdminAdapter"); } + @Override + public CompletableFuture<TableStats> getTableStats(TablePath tablePath) { + throw new UnsupportedOperationException("Not implemented in TestAdminAdapter"); + } + @Override public CompletableFuture<Collection<AclBinding>> listAcls(AclBindingFilter aclBindingFilter) { throw new UnsupportedOperationException("Not implemented in TestAdminAdapter"); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java index 03f853f61..f449477df 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java @@ -20,6 +20,7 @@ package org.apache.fluss.flink.source; import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.metadata.TablePath; @@ -352,9 +353,11 @@ abstract class FlinkTableSourceBatchITCase extends FlinkTestBase { @ParameterizedTest @ValueSource(booleans = {true, false}) - void testCountPushDown(boolean partitionTable) throws Exception { - String tableName = partitionTable ? preparePartitionedLogTable() : prepareLogTable(); - int expectedRows = partitionTable ? 10 : 5; + void testCountPushDownForPkTable(boolean partitionTable) throws Exception { + String tableName = + partitionTable + ? prepareSourceTable(new String[] {"id", "dt"}, "dt") + : prepareSourceTable(new String[] {"id"}, null); // normal scan String query = String.format("SELECT COUNT(*) FROM %s", tableName); assertThat(tEnv.explainSql(query)) @@ -362,7 +365,7 @@ abstract class FlinkTableSourceBatchITCase extends FlinkTestBase { "aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]"); CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect(); List<String> collected = collectRowsWithTimeout(iterRows, 1); - List<String> expected = Collections.singletonList(String.format("+I[%s]", expectedRows)); + List<String> expected = Collections.singletonList("+I[5]"); assertThat(collected).isEqualTo(expected); // test not push down grouping count. @@ -375,15 +378,51 @@ abstract class FlinkTableSourceBatchITCase extends FlinkTestBase { .wait()) .hasMessageContaining( "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + } + + @Test + void testCountPushDownWithWALMode() throws Exception { + String tableName = "test_count_table_with_wal"; + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " address varchar," + + " name varchar," + + " primary key (id) NOT ENFORCED)" + + " with ('bucket.num' = '4', 'table.changelog.image' = 'wal')", + tableName)) + .await(); + // normal scan + String query = String.format("SELECT COUNT(*) FROM %s", tableName); + assertThatThrownBy(() -> tEnv.executeSql(query)) + .hasRootCauseInstanceOf(InvalidTableException.class) + .hasMessageContaining( + "Row count is disabled for this table 'defaultdb.test_count_table_with_wal'."); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCountPushDownForLogTable(boolean partitionTable) throws Exception { + String tableName = partitionTable ? preparePartitionedLogTable() : prepareLogTable(); + int expectedRows = partitionTable ? 10 : 5; + // normal scan + String query = String.format("SELECT COUNT(*) FROM %s", tableName); + assertThat(tEnv.explainSql(query)) + .contains( + "aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]"); + CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect(); + List<String> collected = collectRowsWithTimeout(iterRows, 1); + List<String> expected = Collections.singletonList(String.format("+I[%s]", expectedRows)); + assertThat(collected).isEqualTo(expected); - // test not support primary key now - String primaryTableName = prepareSourceTable(new String[] {"id"}, null); + // test not push down grouping count. assertThatThrownBy( () -> tEnv.explainSql( String.format( - "SELECT COUNT(*) FROM %s ", - primaryTableName)) + "SELECT COUNT(*) FROM %s group by id", + tableName)) .wait()) .hasMessageContaining( "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/TableStatsResultForBucket.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/TableStatsResultForBucket.java new file mode 100644 index 000000000..cdea0109e --- /dev/null +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/TableStatsResultForBucket.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.rpc.entity; + +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.rpc.protocol.ApiError; + +/** Result of {@link org.apache.fluss.rpc.messages.GetTableStatsResponse} for each table bucket. */ +public class TableStatsResultForBucket extends ResultForBucket { + + private final long rowCount; + + public TableStatsResultForBucket(TableBucket tableBucket, long rowCount) { + super(tableBucket); + this.rowCount = rowCount; + } + + public TableStatsResultForBucket(TableBucket tableBucket, ApiError error) { + super(tableBucket, error); + this.rowCount = -1; + } + + /** Returns the row count of the table bucket. If the request is failed, it will return -1. */ + public long getRowCount() { + return rowCount; + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java index 578b74e5e..6f20828b5 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java @@ -20,6 +20,8 @@ package org.apache.fluss.rpc.gateway; import org.apache.fluss.rpc.RpcGateway; import org.apache.fluss.rpc.messages.FetchLogRequest; import org.apache.fluss.rpc.messages.FetchLogResponse; +import org.apache.fluss.rpc.messages.GetTableStatsRequest; +import org.apache.fluss.rpc.messages.GetTableStatsResponse; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.LimitScanRequest; @@ -130,6 +132,14 @@ public interface TabletServerGateway extends RpcGateway, AdminReadOnlyGateway { @RPC(api = ApiKeys.LIMIT_SCAN) CompletableFuture<LimitScanResponse> limitScan(LimitScanRequest request); + /** + * Get statistics for the specified table buckets. + * + * @return the table stats response containing per-bucket statistics. + */ + @RPC(api = ApiKeys.GET_TABLE_STATS) + CompletableFuture<GetTableStatsResponse> getTableStats(GetTableStatsRequest request); + /** * List offsets for the specified table bucket. * diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java index f2e62c68a..a3fd7afbd 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java @@ -315,6 +315,7 @@ final class ServerConnection { version = serverApiVersions.highestAvailableVersion(apiKey); } catch (Exception e) { responseFuture.completeExceptionally(e); + return responseFuture; } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index f7cc45007..f3fadec64 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -100,7 +100,8 @@ public enum ApiKeys { DELETE_PRODUCER_OFFSETS(1055, 0, 0, PUBLIC), ACQUIRE_KV_SNAPSHOT_LEASE(1056, 0, 0, PUBLIC), RELEASE_KV_SNAPSHOT_LEASE(1057, 0, 0, PUBLIC), - DROP_KV_SNAPSHOT_LEASE(1058, 0, 0, PUBLIC); + DROP_KV_SNAPSHOT_LEASE(1058, 0, 0, PUBLIC), + GET_TABLE_STATS(1059, 0, 0, PUBLIC); private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 66f09f0dc..f04b30756 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -288,6 +288,21 @@ message LimitScanResponse{ } +// Get table statistics request and response. +// Sent to TabletServer to get per-bucket statistics. +message GetTableStatsRequest { + required int64 table_id = 1; + repeated PbTableStatsReqForBucket buckets_req = 2; + // Column indexes for column-level stats (e.g., ndv, sum). + // If empty, column stats are not collected. + // Reserved for future use. + repeated int32 target_columns = 3 [packed = true]; +} + +message GetTableStatsResponse { + repeated PbTableStatsRespForBucket buckets_resp = 1; +} + // notify bucket leader and isr request message NotifyLeaderAndIsrRequest { required int32 coordinator_epoch = 1; @@ -1184,4 +1199,35 @@ message PbKvSnapshotLeaseForBucket { optional int64 partition_id = 1; required int32 bucket_id = 2; required int64 snapshot_id = 3; +} + +message PbTableStatsReqForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; +} + +message PbTableStatsRespForBucket { + optional int32 error_code = 1; + optional string error_message = 2; + optional int64 partition_id = 3; + required int32 bucket_id = 4; + + // --- Table-level stats --- + // The number of rows in this bucket. + // For KV tables: the number of unique keys (live rows). + // For Log tables: total number of log records (highWatermark - logStartOffset). + // Absent if row count is not available (e.g., WAL changelog mode or legacy tables). + optional int64 row_count = 5; + + // --- data size stats (future) --- + // The data size in bytes of this bucket. + // For KV tables: the size of the KV store. + // For Log tables: the size of the log segments. + // Reserved for future use. + // optional int64 data_size_bytes = 6; + + // --- Column-level stats (future) --- + // Per-column statistics, keyed by column index. + // Only populated when target_columns is specified in the request. + // repeated PbColumnStats column_stats = 10; } \ No newline at end of file diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 7773ea5da..8d3a4af30 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -39,6 +39,8 @@ import org.apache.fluss.rpc.messages.GetTableInfoRequest; import org.apache.fluss.rpc.messages.GetTableInfoResponse; import org.apache.fluss.rpc.messages.GetTableSchemaRequest; import org.apache.fluss.rpc.messages.GetTableSchemaResponse; +import org.apache.fluss.rpc.messages.GetTableStatsRequest; +import org.apache.fluss.rpc.messages.GetTableStatsResponse; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.LimitScanRequest; @@ -136,6 +138,11 @@ public class TestingTabletGatewayService extends TestingGatewayService return null; } + @Override + public CompletableFuture<GetTableStatsResponse> getTableStats(GetTableStatsRequest request) { + return null; + } + @Override public CompletableFuture<ListOffsetsResponse> listOffsets(ListOffsetsRequest request) { return null; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java index 5d32e413e..e200f8d60 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java @@ -150,9 +150,9 @@ public class KvRecoverHelper { if (recoverPointRowCount != null) { kvTablet.setRowCount(rowCountUpdater.getRowCount()); LOG.info( - "Updated row count to {} for table '{}' after recovering from log", + "Updated row count to {} for tablet '{}' after recovering from log", rowCountUpdater.getRowCount(), - kvTablet.getTablePath()); + kvTablet.getTableBucket()); } else { LOG.info( "Skipping row count update after recovering from log, because this table '{}' doesn't support row count.", @@ -163,7 +163,10 @@ public class KvRecoverHelper { ThrowingConsumer<KeyValueAndLogOffset, Exception> resumeRecordApplier = (resumeRecord) -> kvTablet.putToPreWriteBuffer( - resumeRecord.key, resumeRecord.value, resumeRecord.logOffset); + resumeRecord.changeType, + resumeRecord.key, + resumeRecord.value, + resumeRecord.logOffset); readLogRecordsAndApply( nextLogOffset, // records in pre-write-buffer shouldn't affect the row count, the high-watermark @@ -178,10 +181,10 @@ public class KvRecoverHelper { AutoIncIDRange newRange = autoIncIdRangeUpdater.getNewRange(); kvTablet.updateAutoIncrementIDRange(newRange); LOG.info( - "Updated auto inc id range to [{}, {}] for table '{}' after recovering from log", + "Updated auto inc id range to [{}, {}] for tablet '{}' after recovering from log", newRange.getStart(), newRange.getEnd(), - kvTablet.getTablePath()); + kvTablet.getTableBucket()); } } @@ -230,7 +233,7 @@ public class KvRecoverHelper { } resumeRecordConsumer.accept( new KeyValueAndLogOffset( - key, value, logRecord.logOffset())); + changeType, key, value, logRecord.logOffset())); // reuse the logRow instance which is usually a CompactedRow which // has been deserialized during toKvRow(..) @@ -301,11 +304,14 @@ public class KvRecoverHelper { } private static final class KeyValueAndLogOffset { + private final ChangeType changeType; private final byte[] key; private final @Nullable byte[] value; private final long logOffset; - public KeyValueAndLogOffset(byte[] key, byte[] value, long logOffset) { + public KeyValueAndLogOffset( + ChangeType changeType, byte[] key, byte[] value, long logOffset) { + this.changeType = changeType; this.key = key; this.value = value; this.logOffset = logOffset; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index ca89fe829..e72428a02 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -22,6 +22,7 @@ import org.apache.fluss.compression.ArrowCompressionInfo; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.DeletionDisabledException; +import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.KvStorageException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.memory.MemorySegmentPool; @@ -295,6 +296,20 @@ public final class KvTablet { this.rowCount = rowCount; } + // row_count is volatile, so it's safe to read without lock + public long getRowCount() { + if (rowCount == ROW_COUNT_DISABLED) { + throw new InvalidTableException( + String.format( + "Row count is disabled for this table '%s'. This usually happens when the table is" + + "created before v0.9 or the changelog image is set to WAL, " + + "as maintaining row count in WAL mode is costly and not necessary for most use cases. " + + "If you want to enable row count, please set changelog image to FULL.", + getTablePath())); + } + return rowCount; + } + /** * Get the current state of the tablet, including the log offset, row count and auto-increment * ID range. This is used for snapshot and recovery to capture the state of the tablet at a @@ -589,7 +604,7 @@ public final class KvTablet { throws Exception { BinaryValue newValue = autoIncrementUpdater.updateAutoIncrementColumns(currentValue); walBuilder.append(ChangeType.INSERT, latestSchemaRow.replaceRow(newValue.row)); - kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset); + kvPreWriteBuffer.insert(key, newValue.encodeValue(), logOffset); return logOffset + 1; } @@ -603,12 +618,12 @@ public final class KvTablet { throws Exception { if (changelogImage == ChangelogImage.WAL) { walBuilder.append(ChangeType.UPDATE_AFTER, latestSchemaRow.replaceRow(newValue.row)); - kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset); + kvPreWriteBuffer.update(key, newValue.encodeValue(), logOffset); return logOffset + 1; } else { walBuilder.append(ChangeType.UPDATE_BEFORE, latestSchemaRow.replaceRow(oldValue.row)); walBuilder.append(ChangeType.UPDATE_AFTER, latestSchemaRow.replaceRow(newValue.row)); - kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset + 1); + kvPreWriteBuffer.update(key, newValue.encodeValue(), logOffset + 1); return logOffset + 2; } } @@ -679,12 +694,18 @@ public final class KvTablet { } /** put key,value,logOffset into pre-write buffer directly. */ - void putToPreWriteBuffer(byte[] key, @Nullable byte[] value, long logOffset) { + void putToPreWriteBuffer( + ChangeType changeType, byte[] key, @Nullable byte[] value, long logOffset) { KvPreWriteBuffer.Key wrapKey = KvPreWriteBuffer.Key.of(key); - if (value == null) { + if (changeType == ChangeType.DELETE && value == null) { kvPreWriteBuffer.delete(wrapKey, logOffset); + } else if (changeType == ChangeType.INSERT) { + kvPreWriteBuffer.insert(wrapKey, value, logOffset); + } else if (changeType == ChangeType.UPDATE_AFTER) { + kvPreWriteBuffer.update(wrapKey, value, logOffset); } else { - kvPreWriteBuffer.put(wrapKey, value, logOffset); + throw new IllegalArgumentException( + "Unsupported change type for putToPreWriteBuffer: " + changeType); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java index 5908ba4b1..e14d09eac 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java @@ -20,6 +20,7 @@ package org.apache.fluss.server.kv.prewrite; import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.metrics.Counter; +import org.apache.fluss.record.ChangeType; import org.apache.fluss.server.kv.KvBatchWriter; import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.utils.MurmurHashUtils; @@ -114,7 +115,7 @@ public class KvPreWriteBuffer implements AutoCloseable { * @param logSequenceNumber the log sequence number for the delete operation */ public void delete(Key key, long logSequenceNumber) { - update(key, Value.of(null), logSequenceNumber); + doPut(ChangeType.DELETE, key, Value.of(null), logSequenceNumber); } /** @@ -122,11 +123,20 @@ public class KvPreWriteBuffer implements AutoCloseable { * * @param logSequenceNumber the log sequence number for the put operation */ - public void put(Key key, @Nullable byte[] value, long logSequenceNumber) { - update(key, Value.of(value), logSequenceNumber); + public void insert(Key key, byte[] value, long logSequenceNumber) { + doPut(ChangeType.INSERT, key, Value.of(value), logSequenceNumber); } - private void update(Key key, Value value, long lsn) { + /** + * Put a key-value pair. + * + * @param logSequenceNumber the log sequence number for the put operation + */ + public void update(Key key, @Nullable byte[] value, long logSequenceNumber) { + doPut(ChangeType.UPDATE_AFTER, key, Value.of(value), logSequenceNumber); + } + + private void doPut(ChangeType changeType, Key key, Value value, long lsn) { if (maxLogSequenceNumber >= lsn) { throw new IllegalArgumentException( "The log sequence number must be non-decreasing. " @@ -142,8 +152,8 @@ public class KvPreWriteBuffer implements AutoCloseable { key, (k, v) -> v == null - ? KvEntry.of(key, value, lsn) - : KvEntry.of(key, value, lsn, v)); + ? KvEntry.of(changeType, key, value, lsn) + : KvEntry.of(changeType, key, value, lsn, v)); // append the entry to the tail of the list for all kv entries allKvEntries.addLast(kvEntry); // update the max lsn @@ -220,14 +230,19 @@ public class KvPreWriteBuffer implements AutoCloseable { Value value = entry.getValue(); if (value.value != null) { flushedCount += 1; - rowCountDiff += 1; kvBatchWriter.put(entry.getKey().key, value.value); } else { flushedCount += 1; - rowCountDiff -= 1; kvBatchWriter.delete(entry.getKey().key); } + // for update_after, we don't change the row count + if (entry.getChangeType() == ChangeType.INSERT) { + rowCountDiff += 1; + } else if (entry.getChangeType() == ChangeType.DELETE) { + rowCountDiff -= 1; + } + // if the kv entry to be flushed is equal to the one in the kvEntryMap, we // can remove it from the map. Although it's not a must to remove from the map, // we remove it to reduce the memory usage @@ -271,6 +286,10 @@ public class KvPreWriteBuffer implements AutoCloseable { return truncateAsErrorCount; } + // ------------------------------------------------------------------------------------------- + // Inner classes + // ------------------------------------------------------------------------------------------- + /** * A class to wrap a key-value pair and the sequence number for the key-value pair. If the byte * array in the value is null, it means the key in the entry is marked as deleted. @@ -280,6 +299,7 @@ public class KvPreWriteBuffer implements AutoCloseable { */ public static class KvEntry { + private final ChangeType changeType; private final Key key; private final Value value; private final long logSequenceNumber; @@ -287,22 +307,36 @@ public class KvPreWriteBuffer implements AutoCloseable { // the previous mapped value in the buffer before this key-value put @Nullable private final KvEntry previousEntry; - public static KvEntry of(Key key, Value value, long sequenceNumber) { - return new KvEntry(key, value, sequenceNumber, null); + public static KvEntry of(ChangeType changeType, Key key, Value value, long sequenceNumber) { + return new KvEntry(changeType, key, value, sequenceNumber, null); } - public static KvEntry of(Key key, Value value, long sequenceNumber, KvEntry previousEntry) { - return new KvEntry(key, value, sequenceNumber, previousEntry); + public static KvEntry of( + ChangeType changeType, + Key key, + Value value, + long sequenceNumber, + KvEntry previousEntry) { + return new KvEntry(changeType, key, value, sequenceNumber, previousEntry); } private KvEntry( - Key key, Value value, long logSequenceNumber, @Nullable KvEntry previousEntry) { + ChangeType changeType, + Key key, + Value value, + long logSequenceNumber, + @Nullable KvEntry previousEntry) { + this.changeType = changeType; this.key = key; this.value = value; this.logSequenceNumber = logSequenceNumber; this.previousEntry = previousEntry; } + public ChangeType getChangeType() { + return changeType; + } + public Key getKey() { return key; } @@ -325,6 +359,7 @@ public class KvPreWriteBuffer implements AutoCloseable { } KvEntry kvEntry = (KvEntry) o; return logSequenceNumber == kvEntry.logSequenceNumber + && changeType == kvEntry.changeType && Objects.equals(key, kvEntry.key) && Objects.equals(value, kvEntry.value) && Objects.equals(previousEntry, kvEntry.previousEntry); @@ -338,7 +373,9 @@ public class KvPreWriteBuffer implements AutoCloseable { @Override public String toString() { return "KvEntry{" - + "key=" + + "changeType=" + + changeType + + ", key=" + key + ", value=" + value diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index 371d3c128..a1fa20d5a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -233,6 +233,10 @@ public final class LogTablet { return localLog.getTableBucket(); } + public long getRowCount() { + return getHighWatermark() - logStartOffset(); + } + public long getHighWatermark() { return highWatermarkMetadata.getMessageOffset(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 8a1621a84..10678eb5c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -1379,6 +1379,21 @@ public final class Replica { } } + public long getRowCount() { + return inReadLock( + leaderIsrUpdateLock, + () -> { + KvTablet kv = this.kvTablet; + if (kv != null) { + // return materialized row count for primary key table + return kv.getRowCount(); + } else { + // return log row count for non-primary key table + return logTablet.getRowCount(); + } + }); + } + public long getOffset(RemoteLogManager remoteLogManager, ListOffsetsParam listOffsetsParam) throws IOException { return inReadLock( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index aeb963c9b..eea783dba 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -54,6 +54,7 @@ import org.apache.fluss.rpc.entity.LookupResultForBucket; import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket; import org.apache.fluss.rpc.entity.ProduceLogResultForBucket; import org.apache.fluss.rpc.entity.PutKvResultForBucket; +import org.apache.fluss.rpc.entity.TableStatsResultForBucket; import org.apache.fluss.rpc.entity.WriteResultForBucket; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetResponse; @@ -1265,6 +1266,23 @@ public class ReplicaManager { return putResultForBucketMap; } + public void getTableStats( + List<TableBucket> tableBucket, + Consumer<List<TableStatsResultForBucket>> responseCallback) { + List<TableStatsResultForBucket> results = new ArrayList<>(); + for (TableBucket tb : tableBucket) { + try { + Replica replica = getReplicaOrException(tb); + long rowCount = replica.getRowCount(); + results.add(new TableStatsResultForBucket(tb, rowCount)); + } catch (Exception e) { + LOG.error("Error getting table stats on replica {}", tableBucket, e); + results.add(new TableStatsResultForBucket(tb, ApiError.fromThrowable(e))); + } + } + responseCallback.accept(results); + } + public void limitScan( TableBucket tableBucket, int limit, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index df2ab926f..b511f2178 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -32,6 +32,8 @@ import org.apache.fluss.rpc.entity.ResultForBucket; import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.FetchLogRequest; import org.apache.fluss.rpc.messages.FetchLogResponse; +import org.apache.fluss.rpc.messages.GetTableStatsRequest; +import org.apache.fluss.rpc.messages.GetTableStatsResponse; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.LimitScanRequest; @@ -106,9 +108,11 @@ import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getNotifySnaps import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getPutKvData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getStopReplicaData; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getTableStatsRequestData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getTargetColumns; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getUpdateMetadataRequestData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeGetTableStatsResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeInitWriterResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLimitScanResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeListOffsetsResponse; @@ -306,6 +310,17 @@ public final class TabletService extends RpcServiceBase implements TabletServerG return response; } + @Override + public CompletableFuture<GetTableStatsResponse> getTableStats(GetTableStatsRequest request) { + authorizeTable(READ, request.getTableId()); + + CompletableFuture<GetTableStatsResponse> response = new CompletableFuture<>(); + replicaManager.getTableStats( + getTableStatsRequestData(request), + result -> response.complete(makeGetTableStatsResponse(result))); + return response; + } + @Override public CompletableFuture<NotifyLeaderAndIsrResponse> notifyLeaderAndIsr( NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index 092894e0a..6f5672ff4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -57,6 +57,7 @@ import org.apache.fluss.rpc.entity.LookupResultForBucket; import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket; import org.apache.fluss.rpc.entity.ProduceLogResultForBucket; import org.apache.fluss.rpc.entity.PutKvResultForBucket; +import org.apache.fluss.rpc.entity.TableStatsResultForBucket; import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest; import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseResponse; import org.apache.fluss.rpc.messages.AdjustIsrRequest; @@ -74,6 +75,8 @@ import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse; import org.apache.fluss.rpc.messages.GetLakeSnapshotResponse; import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse; import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse; +import org.apache.fluss.rpc.messages.GetTableStatsRequest; +import org.apache.fluss.rpc.messages.GetTableStatsResponse; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; import org.apache.fluss.rpc.messages.LimitScanResponse; @@ -146,6 +149,8 @@ import org.apache.fluss.rpc.messages.PbTableBucket; import org.apache.fluss.rpc.messages.PbTableMetadata; import org.apache.fluss.rpc.messages.PbTableOffsets; import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.messages.PbTableStatsReqForBucket; +import org.apache.fluss.rpc.messages.PbTableStatsRespForBucket; import org.apache.fluss.rpc.messages.PbValue; import org.apache.fluss.rpc.messages.PbValueList; import org.apache.fluss.rpc.messages.PrefixLookupRequest; @@ -2061,6 +2066,45 @@ public class ServerRpcMessageUtils { leaseForBucket.getSnapshotId()); } + // ----------------------------------------------------------------------------------- + // Table Stats Request and Response + // ----------------------------------------------------------------------------------- + + public static List<TableBucket> getTableStatsRequestData(GetTableStatsRequest request) { + List<TableBucket> tableBuckets = new ArrayList<>(); + long tableId = request.getTableId(); + for (PbTableStatsReqForBucket reqForBucket : request.getBucketsReqsList()) { + tableBuckets.add( + new TableBucket( + tableId, + reqForBucket.hasPartitionId() ? reqForBucket.getPartitionId() : null, + reqForBucket.getBucketId())); + } + return tableBuckets; + } + + public static GetTableStatsResponse makeGetTableStatsResponse( + List<TableStatsResultForBucket> stats) { + GetTableStatsResponse response = new GetTableStatsResponse(); + for (TableStatsResultForBucket statForBucket : stats) { + TableBucket tb = statForBucket.getTableBucket(); + PbTableStatsRespForBucket respForBucket = + response.addBucketsResp().setBucketId(tb.getBucket()); + if (tb.getPartitionId() != null) { + respForBucket.setPartitionId(tb.getPartitionId()); + } + if (statForBucket.failed()) { + respForBucket.setError( + statForBucket.getErrorCode(), statForBucket.getErrorMessage()); + } else { + respForBucket.setRowCount(statForBucket.getRowCount()); + } + } + return response; + } + + // ----------------------------------------------------------------------------------- + private static <T> Map<TableBucket, T> mergeResponse( Map<TableBucket, T> response, Map<TableBucket, T> errors) { if (errors.isEmpty()) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 1d59af396..da918f45e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -17,8 +17,10 @@ package org.apache.fluss.server.kv; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.TableConfig; +import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.InvalidTargetColumnException; import org.apache.fluss.exception.OutOfOrderSequenceException; import org.apache.fluss.memory.TestingMemorySegmentPool; @@ -715,16 +717,19 @@ class KvTabletTest { KvEntry kv1 = KvEntry.of( + ChangeType.INSERT, Key.of("k1".getBytes()), valueOf(compactedRow(baseRowType, new Object[] {1, "v11"})), 0); KvEntry kv2 = KvEntry.of( + ChangeType.INSERT, Key.of("k2".getBytes()), valueOf(compactedRow(baseRowType, new Object[] {2, "v21"})), 1); KvEntry kv3 = KvEntry.of( + ChangeType.UPDATE_AFTER, Key.of("k2".getBytes()), valueOf(compactedRow(baseRowType, new Object[] {2, "v23"})), 3, @@ -1664,4 +1669,142 @@ class KvTabletTest { assertThat(statistics.getBlockCacheMemoryUsage()).isEqualTo(0); assertThat(statistics.getBlockCachePinnedUsage()).isEqualTo(0); } + + // ==================== Row Count Tests ==================== + + @Test + void testRowCountBasic() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + // Initially row count should be 0 + assertThat(kvTablet.getRowCount()).isEqualTo(0); + + // Insert 5 records + for (int i = 1; i <= 5; i++) { + KvRecordBatch batch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("key" + i, new Object[] {i, "val" + i})); + kvTablet.putAsLeader(batch, null); + } + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + // Row count should be 5 + assertThat(kvTablet.getRowCount()).isEqualTo(5); + + // Delete 2 records + for (int i = 1; i <= 2; i++) { + KvRecordBatch deleteBatch = + kvRecordBatchFactory.ofRecords(kvRecordFactory.ofRecord("key" + i, null)); + kvTablet.putAsLeader(deleteBatch, null); + } + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + // Row count should be 3 + assertThat(kvTablet.getRowCount()).isEqualTo(3); + + kvTablet.close(); + } + + @Test + void testRowCountWithUpsert() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + // Initially row count should be 0 + assertThat(kvTablet.getRowCount()).isEqualTo(0); + + // Insert a record + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("key1", new Object[] {1, "val1"})); + kvTablet.putAsLeader(batch1, null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + assertThat(kvTablet.getRowCount()).isEqualTo(1); + + // Update the same record (upsert) - row count should not change + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("key1", new Object[] {1, "val1_updated"})); + kvTablet.putAsLeader(batch2, null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + assertThat(kvTablet.getRowCount()).isEqualTo(1); + + // Insert another record + KvRecordBatch batch3 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("key2", new Object[] {2, "val2"})); + kvTablet.putAsLeader(batch3, null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + assertThat(kvTablet.getRowCount()).isEqualTo(2); + + kvTablet.close(); + } + + @Test + void testRowCountDisabledForWalChangelog() throws Exception { + // Create KvTablet with WAL changelog mode + Map<String, String> tableConfig = new HashMap<>(); + tableConfig.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL"); + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, tableConfig); + + // Insert some records + KvRecordBatch batch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("key1", new Object[] {1, "val1"})); + kvTablet.putAsLeader(batch, null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + // Getting row count should throw exception for WAL changelog mode + assertThatThrownBy(() -> kvTablet.getRowCount()) + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining("Row count is disabled for this table"); + + kvTablet.close(); + } + + @Test + void testRowCountWithMixedOperations() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + // Initially row count should be 0 + assertThat(kvTablet.getRowCount()).isEqualTo(0); + + // Batch of mixed operations: insert 10 records + List<KvRecord> records = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + records.add(kvRecordFactory.ofRecord("key" + i, new Object[] {i, "val" + i})); + } + KvRecordBatch insertBatch = kvRecordBatchFactory.ofRecords(records); + kvTablet.putAsLeader(insertBatch, null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + assertThat(kvTablet.getRowCount()).isEqualTo(10); + + // Delete some, insert new ones, update existing ones in sequence + // Delete keys 1-3 + List<KvRecord> deleteRecords = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + deleteRecords.add(kvRecordFactory.ofRecord("key" + i, null)); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(deleteRecords), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + assertThat(kvTablet.getRowCount()).isEqualTo(7); + + // Insert new keys 11-15 + List<KvRecord> newRecords = new ArrayList<>(); + for (int i = 11; i <= 15; i++) { + newRecords.add(kvRecordFactory.ofRecord("key" + i, new Object[] {i, "val" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(newRecords), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + assertThat(kvTablet.getRowCount()).isEqualTo(12); + + // Update existing key 4 - row count should not change + KvRecordBatch updateBatch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("key4", new Object[] {4, "val4_updated"})); + kvTablet.putAsLeader(updateBatch, null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + assertThat(kvTablet.getRowCount()).isEqualTo(12); + + kvTablet.close(); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java index b39ec39ac..cb07c2d66 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java @@ -25,6 +25,8 @@ import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; +import java.io.IOException; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -36,10 +38,10 @@ class KvPreWriteBufferTest { KvPreWriteBuffer buffer = new KvPreWriteBuffer( new NopKvBatchWriter(), TestingMetricGroups.TABLET_SERVER_METRICS); - bufferPut(buffer, "key1", "value1", 1); + bufferInsert(buffer, "key1", "value1", 1); bufferDelete(buffer, "key1", 3); - assertThatThrownBy(() -> bufferPut(buffer, "key2", "value2", 2)) + assertThatThrownBy(() -> bufferInsert(buffer, "key2", "value2", 2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "The log sequence number must be non-decreasing. The current " @@ -61,7 +63,7 @@ class KvPreWriteBufferTest { // put a series of kv entries for (int i = 0; i < 3; i++) { - bufferPut(buffer, "key" + i, "value" + i, elementCount++); + bufferInsert(buffer, "key" + i, "value" + i, elementCount++); } // check the key and value; for (int i = 0; i < 3; i++) { @@ -96,7 +98,7 @@ class KvPreWriteBufferTest { assertThat(getValue(buffer, "key2")).isNull(); // put key2 again - bufferPut(buffer, "key2", "value21", elementCount++); + bufferInsert(buffer, "key2", "value21", elementCount++); // we can get key2 assertThat(getValue(buffer, "key2")).isEqualTo("value21"); @@ -113,9 +115,9 @@ class KvPreWriteBufferTest { } // put two key3; - bufferPut(buffer, "key3", "value31", elementCount++); - bufferPut(buffer, "key3", "value32", elementCount++); - bufferPut(buffer, "key2", "value22", elementCount++); + bufferInsert(buffer, "key3", "value31", elementCount++); + bufferInsert(buffer, "key3", "value32", elementCount++); + bufferInsert(buffer, "key2", "value22", elementCount++); // check get key3 get the latest value assertThat(getValue(buffer, "key3")).isEqualTo("value32"); // check get key2 @@ -144,7 +146,7 @@ class KvPreWriteBufferTest { // put a series of kv entries for (int i = 0; i < 10; i++) { - bufferPut(buffer, "key" + i, "value" + i, elementCount++); + bufferInsert(buffer, "key" + i, "value" + i, elementCount++); } // check the key and value; for (int i = 0; i < 10; i++) { @@ -170,8 +172,8 @@ class KvPreWriteBufferTest { assertThat(getValue(buffer, "key3")).isNull(); // add update records - bufferPut(buffer, "key2", "value2-1", elementCount++); - bufferPut(buffer, "key1", "value1-1", elementCount++); + bufferInsert(buffer, "key2", "value2-1", elementCount++); + bufferInsert(buffer, "key1", "value1-1", elementCount++); assertThat(getValue(buffer, "key1")).isEqualTo("value1-1"); assertThat(buffer.getMaxLSN()).isEqualTo(elementCount - 1); buffer.truncateTo(5, TruncateReason.ERROR); @@ -190,9 +192,60 @@ class KvPreWriteBufferTest { assertThat(buffer.getKvEntryMap().size()).isEqualTo(0); } - private static void bufferPut( + @Test + void testRowCount() throws IOException { + KvPreWriteBuffer buffer = + new KvPreWriteBuffer( + new NopKvBatchWriter(), TestingMetricGroups.TABLET_SERVER_METRICS); + int elementCount = 0; + + // put a series of kv entries + for (int i = 0; i < 10; i++) { + bufferInsert(buffer, "key" + i, "value" + i, elementCount++); + } + assertThat(buffer.flush(Long.MAX_VALUE)).isEqualTo(10); + + // delete some keys + for (int i = 0; i < 5; i++) { + bufferDelete(buffer, "key" + i, elementCount++); + } + assertThat(buffer.flush(Long.MAX_VALUE)).isEqualTo(-5); + + // put some keys again + for (int i = 8; i < 9; i++) { + bufferUpdate(buffer, "key" + i, "value" + i, elementCount++); + } + assertThat(buffer.flush(Long.MAX_VALUE)).isEqualTo(0); + + // put some keys again + for (int i = 10; i < 20; i++) { + bufferInsert(buffer, "key" + i, "value" + i, elementCount++); + } + for (int i = 10; i < 13; i++) { + bufferDelete(buffer, "key" + i, elementCount++); + } + // restore to here, so the row count should be 10 - 3 = 7 + int checkpoint = elementCount; + for (int i = 30; i < 35; i++) { + bufferInsert(buffer, "key" + i, "value" + i, elementCount++); + } + for (int i = 30; i < 35; i++) { + bufferUpdate(buffer, "key" + i, "value" + i, elementCount++); + } + + // truncate to 5 + buffer.truncateTo(checkpoint, TruncateReason.ERROR); + assertThat(buffer.flush(Long.MAX_VALUE)).isEqualTo(7); + } + + private static void bufferInsert( + KvPreWriteBuffer kvPreWriteBuffer, String key, String value, int elementCount) { + kvPreWriteBuffer.insert(toKey(key), value.getBytes(), elementCount); + } + + private static void bufferUpdate( KvPreWriteBuffer kvPreWriteBuffer, String key, String value, int elementCount) { - kvPreWriteBuffer.put(toKey(key), value.getBytes(), elementCount); + kvPreWriteBuffer.update(toKey(key), value.getBytes(), elementCount); } private static void bufferDelete( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java index 932c17f23..8ca1b8e27 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java @@ -30,6 +30,7 @@ import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.PbLookupRespForBucket; import org.apache.fluss.rpc.messages.PutKvRequest; +import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.utils.ExceptionUtils; @@ -44,6 +45,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; @@ -56,6 +58,7 @@ import static org.apache.fluss.testutils.DataTestUtils.genKvRecords; import static org.apache.fluss.testutils.DataTestUtils.getKeyValuePairs; import static org.apache.fluss.testutils.DataTestUtils.toKvRecordBatch; import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; +import static org.assertj.core.api.Assertions.assertThat; /** The IT case for the restoring of kv replica. */ class KvReplicaRestoreITCase { @@ -200,6 +203,63 @@ class KvReplicaRestoreITCase { } } + @Test + void testRowCountRecoveryAfterFailover() throws Exception { + // Create a primary key table + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(DATA1_SCHEMA_PK).distributedBy(1, "a").build(); + TablePath tablePath = TablePath.of("test_db", "test_row_count_recovery"); + long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + TableBucket tableBucket = new TableBucket(tableId, 0); + + // Wait for replica to become leader + FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket); + int leaderServer = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tableBucket); + + // Insert 100 records + int recordCount = 100; + List<KvRecord> records = new ArrayList<>(); + for (int i = 0; i < recordCount; i++) { + records.addAll(genKvRecords(new Object[] {i, "value" + i})); + } + putRecordBatch(tableBucket, leaderServer, toKvRecordBatch(records)).join(); + + // Verify initial row count + Replica leaderReplica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket); + assertThat(leaderReplica.getRowCount()).isEqualTo(recordCount); + + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath); + + // Verify the snapshot contains row count + CompletedSnapshot completedSnapshot = + completedSnapshotHandleStore + .getLatestCompletedSnapshotHandle(tableBucket) + .get() + .retrieveCompleteSnapshot(); + assertThat(completedSnapshot.getRowCount()).isNotNull(); + assertThat(completedSnapshot.getRowCount()).isEqualTo(recordCount); + + // Trigger another write to generate changelogs not in snapshot + List<KvRecord> moreRecords = new ArrayList<>(); + for (int i = recordCount; i < recordCount + 10; i++) { + moreRecords.addAll(genKvRecords(new Object[] {i, "value" + i})); + } + recordCount = recordCount + 10; + putRecordBatch(tableBucket, leaderServer, toKvRecordBatch(moreRecords)).join(); + + // simulate failure and force failover + int currentLeader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tableBucket); + FLUSS_CLUSTER_EXTENSION.stopTabletServer(currentLeader); + FLUSS_CLUSTER_EXTENSION.startTabletServer(currentLeader); + + // Get the new leader replica + Replica newLeaderReplica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket); + assertThat(newLeaderReplica.getLeaderId()).isNotEqualTo(currentLeader); + + // Verify the row count is restored correctly after failover and applied changelogs + assertThat(newLeaderReplica.getRowCount()).isEqualTo(recordCount); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); @@ -213,13 +273,13 @@ class KvReplicaRestoreITCase { return conf; } - private void putRecordBatch( + private CompletableFuture<?> putRecordBatch( TableBucket tableBucket, int leaderServer, KvRecordBatch kvRecordBatch) { PutKvRequest putKvRequest = newPutKvRequest( tableBucket.getTableId(), tableBucket.getBucket(), -1, kvRecordBatch); TabletServerGateway leaderGateway = FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderServer); - leaderGateway.putKv(putKvRequest); + return leaderGateway.putKv(putKvRequest); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index 617026fa2..317dee0a2 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -44,6 +44,8 @@ import org.apache.fluss.rpc.messages.GetTableInfoRequest; import org.apache.fluss.rpc.messages.GetTableInfoResponse; import org.apache.fluss.rpc.messages.GetTableSchemaRequest; import org.apache.fluss.rpc.messages.GetTableSchemaResponse; +import org.apache.fluss.rpc.messages.GetTableStatsRequest; +import org.apache.fluss.rpc.messages.GetTableStatsResponse; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.LimitScanRequest; @@ -204,6 +206,11 @@ public class TestTabletServerGateway implements TabletServerGateway { return null; } + @Override + public CompletableFuture<GetTableStatsResponse> getTableStats(GetTableStatsRequest request) { + return null; + } + @Override public CompletableFuture<ListOffsetsResponse> listOffsets(ListOffsetsRequest request) { return null;
