This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 6655bdcf2 [client] Support list database summaries (#2515)
6655bdcf2 is described below
commit 6655bdcf2c9c933c7c492016a7708c06301ae8f2
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Feb 5 16:57:00 2026 +0800
[client] Support list database summaries (#2515)
---
.../java/org/apache/fluss/client/admin/Admin.java | 15 +++
.../org/apache/fluss/client/admin/FlussAdmin.java | 9 ++
.../fluss/client/utils/ClientRpcMessageUtils.java | 23 +++++
.../fluss/client/admin/FlussAdminITCase.java | 17 ++++
.../org/apache/fluss/metadata/DatabaseSummary.java | 104 +++++++++++++++++++++
fluss-rpc/src/main/proto/FlussApi.proto | 9 ++
.../org/apache/fluss/server/RpcServiceBase.java | 9 +-
.../ZkNodeChangeNotificationWatcher.java | 4 +-
.../fluss/server/coordinator/MetadataManager.java | 7 ++
.../fluss/server/utils/ServerRpcMessageUtils.java | 14 +++
.../org/apache/fluss/server/zk/ZkAsyncRequest.java | 7 ++
.../apache/fluss/server/zk/ZkAsyncResponse.java | 32 +++++++
.../apache/fluss/server/zk/ZooKeeperClient.java | 50 +++++++++-
.../org/apache/fluss/server/ServerTestBase.java | 5 +-
.../ZkNodeChangeNotificationWatcherTest.java | 2 +-
.../fluss/server/zk/ZooKeeperClientTest.java | 37 ++++++++
16 files changed, 335 insertions(+), 9 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
index 6c0d6876e..b6e9ea1d6 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
@@ -55,6 +55,7 @@ import org.apache.fluss.exception.TooManyBucketsException;
import org.apache.fluss.exception.TooManyPartitionsException;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
+import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
@@ -179,6 +180,20 @@ public interface Admin extends AutoCloseable {
/** List all databases in fluss cluster asynchronously. */
CompletableFuture<List<String>> listDatabases();
+ /**
+ * List all databases' summary information in fluss cluster
asynchronously. The difference
+ * between this method and {@link #listDatabases()} is that this method
also include some
+ * summaries for the database, like {@link
DatabaseSummary#getCreatedTime()} and {@link
+ * DatabaseSummary#getTableCount()}.
+ *
+ * <p>When interacting older version of fluss cluster which does not
support this API, it will
+ * fall back to {@link #listDatabases()} with {@code -1} value for {@link
+ * DatabaseSummary#getCreatedTime()} and {@link
DatabaseSummary#getTableCount()}.
+ *
+ * @since 0.9
+ */
+ CompletableFuture<List<DatabaseSummary>> listDatabaseSummaries();
+
/**
* Create a new table asynchronously.
*
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 543d65128..9df4bf248 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -33,6 +33,7 @@ import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.LeaderNotAvailableException;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
+import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -245,6 +246,14 @@ public class FlussAdmin implements Admin {
.thenApply(ListDatabasesResponse::getDatabaseNamesList);
}
+ @Override
+ public CompletableFuture<List<DatabaseSummary>> listDatabaseSummaries() {
+ ListDatabasesRequest request = new
ListDatabasesRequest().setIncludeSummary(true);
+ return readOnlyGateway
+ .listDatabases(request)
+ .thenApply(ClientRpcMessageUtils::toDatabaseSummaries);
+ }
+
@Override
public CompletableFuture<Void> createTable(
TablePath tablePath, TableDescriptor tableDescriptor, boolean
ignoreIfExists) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
index 68d8712f1..75cafb491 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
@@ -36,6 +36,7 @@ import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.FsPathAndFileName;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -50,6 +51,7 @@ import
org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse;
+import org.apache.fluss.rpc.messages.ListDatabasesResponse;
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
@@ -58,6 +60,7 @@ import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.PbAddColumn;
import org.apache.fluss.rpc.messages.PbAlterConfig;
import org.apache.fluss.rpc.messages.PbBucketOffset;
+import org.apache.fluss.rpc.messages.PbDatabaseSummary;
import org.apache.fluss.rpc.messages.PbDescribeConfig;
import org.apache.fluss.rpc.messages.PbDropColumn;
import org.apache.fluss.rpc.messages.PbKeyValue;
@@ -646,4 +649,24 @@ public class ClientRpcMessageUtils {
long expirationTime = response.hasExpirationTime() ?
response.getExpirationTime() : 0;
return new ProducerOffsetsResult(response.getProducerId(),
tableOffsets, expirationTime);
}
+
+ public static List<DatabaseSummary>
toDatabaseSummaries(ListDatabasesResponse response) {
+ List<DatabaseSummary> databaseSummaries = new ArrayList<>();
+ for (PbDatabaseSummary pbDatabaseSummary :
response.getDatabaseSummariesList()) {
+ databaseSummaries.add(
+ new DatabaseSummary(
+ pbDatabaseSummary.getDatabaseName(),
+ pbDatabaseSummary.getCreatedTime(),
+ pbDatabaseSummary.getTableCount()));
+ }
+
+ if (response.getDatabaseNamesCount() > 0 &&
response.getDatabaseSummariesCount() == 0) {
+ // backward-compatibility for older server versions that only
returns database names
+ for (String dbName : response.getDatabaseNamesList()) {
+ databaseSummaries.add(new DatabaseSummary(dbName, -1L, -1));
+ }
+ }
+
+ return databaseSummaries;
+ }
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 3d708035f..9936f9430 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -55,6 +55,7 @@ import org.apache.fluss.fs.FsPathAndFileName;
import org.apache.fluss.metadata.AggFunctions;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
+import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.LogFormat;
@@ -874,11 +875,27 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
admin.createDatabase("db3", DatabaseDescriptor.EMPTY, true).get();
assertThat(admin.listDatabases().get())
.containsExactlyInAnyOrder("test_db", "db1", "db2", "db3",
"fluss");
+ Map<String, Integer> databaseSummaries =
+ admin.listDatabaseSummaries().get().stream()
+ .collect(
+ Collectors.toMap(
+ DatabaseSummary::getDatabaseName,
+ DatabaseSummary::getTableCount));
+ assertThat(databaseSummaries.get("db1")).isEqualTo(0);
+ assertThat(databaseSummaries.get("db2")).isEqualTo(0);
admin.createTable(TablePath.of("db1", "table1"),
DEFAULT_TABLE_DESCRIPTOR, true).get();
admin.createTable(TablePath.of("db1", "table2"),
DEFAULT_TABLE_DESCRIPTOR, true).get();
assertThat(admin.listTables("db1").get()).containsExactlyInAnyOrder("table1",
"table2");
assertThat(admin.listTables("db2").get()).isEmpty();
+ databaseSummaries =
+ admin.listDatabaseSummaries().get().stream()
+ .collect(
+ Collectors.toMap(
+ DatabaseSummary::getDatabaseName,
+ DatabaseSummary::getTableCount));
+ assertThat(databaseSummaries.get("db1")).isEqualTo(1);
+ assertThat(databaseSummaries.get("db2")).isEqualTo(0);
assertThatThrownBy(() -> admin.listTables("unknown_db").get())
.cause()
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseSummary.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseSummary.java
new file mode 100644
index 000000000..997dacfad
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseSummary.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+import java.util.Objects;
+
+/**
+ * Aggregated summary information of a database for listing purposes.
+ *
+ * <p>This class contains aggregated metadata about a database, including
creation time, table
+ * count, and other summary statistics. It is distinct from {@link
DatabaseInfo} which contains
+ * complete database metadata including the {@link DatabaseDescriptor}.
+ *
+ * @since 0.6
+ */
+@PublicEvolving
+public class DatabaseSummary {
+ private final String databaseName;
+ private final long createdTime;
+ private final int tableCount;
+
+ public DatabaseSummary(String databaseName, long createdTime, int
tableCount) {
+ this.databaseName = databaseName;
+ this.createdTime = createdTime;
+ this.tableCount = tableCount;
+ }
+
+ /**
+ * Returns the name of the database.
+ *
+ * @return the database name
+ */
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ /**
+ * Returns the creation time of the database in milliseconds since epoch.
+ *
+ * @return the creation timestamp
+ */
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ /**
+ * Returns the number of tables in this database.
+ *
+ * @return the table count
+ */
+ public int getTableCount() {
+ return tableCount;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DatabaseSummary that = (DatabaseSummary) o;
+ return Objects.equals(createdTime, that.createdTime)
+ && Objects.equals(tableCount, that.tableCount)
+ && Objects.equals(databaseName, that.databaseName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(databaseName, createdTime, tableCount);
+ }
+
+ @Override
+ public String toString() {
+ return "DatabaseSummary{"
+ + "databaseName='"
+ + databaseName
+ + '\''
+ + ", createdTime="
+ + createdTime
+ + ", tableCount="
+ + tableCount
+ + '\''
+ + '}';
+ }
+}
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index ea2a832ba..99cdf3238 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -93,9 +93,12 @@ message DatabaseExistsResponse {
// list databases request and response
message ListDatabasesRequest {
+ optional bool include_summary = 1;
}
+
message ListDatabasesResponse {
repeated string database_name = 1;
+ repeated PbDatabaseSummary database_summary = 2;
}
// create table request and response
@@ -1129,4 +1132,10 @@ message PbBucketOffset {
message PbProducerTableOffsets {
required int64 table_id = 1;
repeated PbBucketOffset bucket_offsets = 2;
+}
+
+message PbDatabaseSummary {
+ required string database_name = 1;
+ required int64 created_time = 2;
+ required int32 table_count = 3;
}
\ No newline at end of file
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
index 9e2bcbf16..74b3ae585 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
@@ -122,6 +122,7 @@ import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeListAclsRe
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toGetFileSystemSecurityTokenResponse;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toListPartitionInfosResponse;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toPbConfigEntries;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toPbDatabaseSummary;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
import static org.apache.fluss.utils.Preconditions.checkState;
@@ -217,7 +218,13 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
authorizedDatabase.stream().map(Resource::getName).collect(Collectors.toList());
}
- response.addAllDatabaseNames(databaseNames);
+ if (request.hasIncludeSummary() && request.isIncludeSummary()) {
+ response.addAllDatabaseSummaries(
+
toPbDatabaseSummary(metadataManager.listDatabaseSummaries(databaseNames)));
+ } else {
+ response.addAllDatabaseNames(databaseNames);
+ }
+
return CompletableFuture.completedFuture(response);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcher.java
b/fluss-server/src/main/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcher.java
index 136673fb7..31e6666ac 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcher.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcher.java
@@ -139,8 +139,8 @@ public class ZkNodeChangeNotificationWatcher {
for (String notification : sortedNotifications) {
String notificationNode = seqNodeRoot + "/" + notification;
try {
- Optional<Stat> state =
zooKeeperClient.getStat(notificationNode);
- if (state.isPresent() && now - state.get().getCtime() >=
changeExpirationMs) {
+ Stat state = zooKeeperClient.getStat(notificationNode).get();
+ if (now - state.getCtime() >= changeExpirationMs) {
LOG.debug("Purging change notification {}",
notificationNode);
zooKeeperClient.deletePath(notificationNode);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 02a02cc44..c9537136f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -37,6 +37,7 @@ import org.apache.fluss.exception.TooManyPartitionsException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
+import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaInfo;
@@ -167,6 +168,12 @@ public class MetadataManager {
return uncheck(zookeeperClient::listDatabases, "Fail to list
database");
}
+ public List<DatabaseSummary> listDatabaseSummaries(Collection<String>
databaseNames) {
+ return uncheck(
+ () -> zookeeperClient.listDatabaseSummaries(databaseNames),
+ "Fail to get database summaries for " + databaseNames);
+ }
+
public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException("Database " + databaseName + "
does not exist.");
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index a4bfa01c5..3a8dd5c24 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -29,6 +29,7 @@ import org.apache.fluss.config.cluster.ColumnPositionType;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
@@ -95,6 +96,7 @@ import org.apache.fluss.rpc.messages.PbAlterConfig;
import org.apache.fluss.rpc.messages.PbBucketMetadata;
import org.apache.fluss.rpc.messages.PbBucketOffset;
import org.apache.fluss.rpc.messages.PbCreateAclRespInfo;
+import org.apache.fluss.rpc.messages.PbDatabaseSummary;
import org.apache.fluss.rpc.messages.PbDescribeConfig;
import org.apache.fluss.rpc.messages.PbDropAclsFilterResult;
import org.apache.fluss.rpc.messages.PbDropAclsMatchingAcl;
@@ -1889,6 +1891,18 @@ public class ServerRpcMessageUtils {
.collect(Collectors.toList());
}
+ public static List<PbDatabaseSummary> toPbDatabaseSummary(
+ List<DatabaseSummary> databaseSummaries) {
+ return databaseSummaries.stream()
+ .map(
+ databaseSummary ->
+ new PbDatabaseSummary()
+
.setDatabaseName(databaseSummary.getDatabaseName())
+
.setCreatedTime(databaseSummary.getCreatedTime())
+
.setTableCount(databaseSummary.getTableCount()))
+ .collect(Collectors.toList());
+ }
+
public static RebalanceResponse makeRebalanceResponse(String rebalanceId) {
return new RebalanceResponse().setRebalanceId(rebalanceId);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java
index 9163bd1b6..013731dbd 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java
@@ -45,4 +45,11 @@ public abstract class ZkAsyncRequest {
super(path);
}
}
+
+ /** The request for ZooKeeper checkExists async operation. */
+ public static class ZkCheckExistsRequest extends ZkAsyncRequest {
+ protected ZkCheckExistsRequest(String path) {
+ super(path);
+ }
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java
index 900939f44..627520146 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.zk;
import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.CuratorEvent;
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import
org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException.Code;
+import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.Optional;
@@ -58,6 +59,18 @@ public abstract class ZkAsyncResponse {
}
}
+ /** Returns a string representation of the error message, or empty string
if none. */
+ public String getErrorMessage() {
+ return resultException()
+ .map(e -> e.getClass().getSimpleName() + ": " + e.getMessage())
+ .orElse("");
+ }
+
+ /** Returns true if the response indicates an error. */
+ public boolean hasError() {
+ return resultCode != Code.OK;
+ }
+
//
-------------------------------------------------------------------------------------------
/** The response for ZooKeeper getData async operation. */
@@ -100,4 +113,23 @@ public abstract class ZkAsyncResponse {
event.getPath(), Code.get(event.getResultCode()),
event.getChildren());
}
}
+
+ /** The response for ZooKeeper checkExists async operation. */
+ public static class ZkCheckExistsResponse extends ZkAsyncResponse {
+ private final Stat stat;
+
+ public ZkCheckExistsResponse(String path, KeeperException.Code
resultCode, Stat stat) {
+ super(path, resultCode);
+ this.stat = stat;
+ }
+
+ public Stat getStat() {
+ return stat;
+ }
+
+ public static ZkCheckExistsResponse create(CuratorEvent event) {
+ return new ZkCheckExistsResponse(
+ event.getPath(), Code.get(event.getResultCode()),
event.getStat());
+ }
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index 4d9bf283d..86f3b952f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.Schema;
@@ -34,8 +35,10 @@ import org.apache.fluss.security.acl.ResourceType;
import org.apache.fluss.server.authorizer.DefaultAuthorizer.VersionedAcls;
import org.apache.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo;
import org.apache.fluss.server.metadata.BucketMetadata;
+import org.apache.fluss.server.zk.ZkAsyncRequest.ZkCheckExistsRequest;
import org.apache.fluss.server.zk.ZkAsyncRequest.ZkGetChildrenRequest;
import org.apache.fluss.server.zk.ZkAsyncRequest.ZkGetDataRequest;
+import org.apache.fluss.server.zk.ZkAsyncResponse.ZkCheckExistsResponse;
import org.apache.fluss.server.zk.ZkAsyncResponse.ZkGetChildrenResponse;
import org.apache.fluss.server.zk.ZkAsyncResponse.ZkGetDataResponse;
import org.apache.fluss.server.zk.data.BucketSnapshot;
@@ -487,6 +490,36 @@ public class ZooKeeperClient implements AutoCloseable {
return getChildren(DatabasesZNode.path());
}
+ public List<DatabaseSummary> listDatabaseSummaries(Collection<String>
databaseNames)
+ throws Exception {
+ Map<String, String> path2DatabaseNamesMap =
+ databaseNames.stream()
+ .collect(toMap(DatabaseZNode::path, databaseName ->
databaseName));
+ List<ZkCheckExistsResponse> statsInBackground =
+ getStatInBackground(path2DatabaseNamesMap.keySet());
+ List<DatabaseSummary> databaseSummaries = new ArrayList<>();
+ for (ZkCheckExistsResponse response : statsInBackground) {
+ Stat stat = response.getStat();
+ if (!response.hasError() && stat != null) {
+ // To decrease the cost, use zk node creation time as the
database creation
+ // time rather than create_time in node data.
+ databaseSummaries.add(
+ new DatabaseSummary(
+ path2DatabaseNamesMap.get(response.getPath()),
+ response.getStat().getCtime(),
+ response.getStat().getNumChildren()));
+ } else {
+ // silently ignore the database which does not exist anymore,
+ // because the database names are listed by server not user
+ LOG.warn(
+ "Failed to get database summary for database {}. {}",
+ path2DatabaseNamesMap.get(response.getPath()),
+ response.getErrorMessage());
+ }
+ }
+ return databaseSummaries;
+ }
+
public List<String> listTables(String databaseName) throws Exception {
return getChildren(TablesZNode.path(databaseName));
}
@@ -1426,7 +1459,8 @@ public class ZooKeeperClient implements AutoCloseable {
} else if (request instanceof ZkGetChildrenRequest) {
zkClient.getChildren().inBackground(callback).forPath(request.getPath());
-
+ } else if (request instanceof ZkCheckExistsRequest) {
+
zkClient.checkExists().inBackground(callback).forPath(request.getPath());
} else {
throw new IllegalArgumentException(
"Unsupported request type: " +
request.getClass());
@@ -1498,6 +1532,20 @@ public class ZooKeeperClient implements AutoCloseable {
return handleRequestInBackground(requests, ZkGetDataResponse::create);
}
+ /**
+ * Gets the stat of given zk node paths in background.
+ *
+ * @param paths the paths to fetch stat
+ * @return list of async responses for each path
+ * @throws Exception if there is an error during the operation
+ */
+ private List<ZkCheckExistsResponse> getStatInBackground(Collection<String>
paths)
+ throws Exception {
+ List<ZkCheckExistsRequest> requests =
+
paths.stream().map(ZkCheckExistsRequest::new).collect(Collectors.toList());
+ return handleRequestInBackground(requests,
ZkCheckExistsResponse::create);
+ }
+
/**
* Send a pipelined sequence of requests and wait for all of their
responses synchronously in
* background.
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java
b/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java
index c97cd4e99..2504aebed 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java
@@ -36,7 +36,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Duration;
import java.util.List;
-import java.util.Optional;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
@@ -104,9 +103,7 @@ public abstract class ServerTestBase {
retry(
Duration.ofMinutes(2),
() -> {
- Optional<Stat> optionalStat =
zookeeperClient.getStat(path);
- assertThat(optionalStat).isPresent();
- Stat stat = optionalStat.get();
+ Stat stat = zookeeperClient.getStat(path).get();
assertThat(stat.getCtime()).isGreaterThan(oldNodeCtime);
});
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcherTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcherTest.java
index 641fb5c12..d43784c1a 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcherTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcherTest.java
@@ -112,7 +112,7 @@ public class ZkNodeChangeNotificationWatcherTest {
clock.advanceTime(
maxCtimeBeforeStart - startTime +
Duration.ofMinutes(5).toMillis(),
TimeUnit.MILLISECONDS);
- // Insert a new notification to trigger the purging of obsolete
notificatios.
+ // Insert a new notification to trigger the purging of obsolete
notifications.
zookeeperClient.insertAclChangeNotification(newNoticedResource);
retry(
Duration.ofMinutes(1),
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
index f6caffd19..94cb34b23 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
@@ -23,6 +23,7 @@ import
org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.TableBucket;
@@ -667,4 +668,40 @@ class ZooKeeperClientTest {
.isEqualTo("zookeeper2");
}
}
+
+ @Test
+ void testGetDatabaseSummary() throws Exception {
+ TablePath tablePath = TablePath.of("db", "tb1");
+
+ assertThat(
+ zookeeperClient.listDatabaseSummaries(
+
Collections.singletonList(tablePath.getDatabaseName())))
+ .isEmpty();
+
+ // register table.
+ long beforeCreateTime = System.currentTimeMillis();
+ TableRegistration tableReg1 =
+ new TableRegistration(
+ 11,
+ "first table",
+ Arrays.asList("a", "b"),
+ new TableDescriptor.TableDistribution(16,
Collections.singletonList("a")),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ beforeCreateTime,
+ beforeCreateTime);
+ zookeeperClient.registerTable(tablePath, tableReg1);
+
+ long afterCreateTime = System.currentTimeMillis();
+ List<DatabaseSummary> databaseSummaries =
+ zookeeperClient.listDatabaseSummaries(
+
Collections.singletonList(tablePath.getDatabaseName()));
+ assertThat(databaseSummaries).hasSize(1);
+ DatabaseSummary databaseSummary = databaseSummaries.get(0);
+ assertThat(databaseSummary.getDatabaseName()).isEqualTo("db");
+ assertThat(databaseSummary.getTableCount()).isEqualTo(1);
+ assertThat(databaseSummary.getCreatedTime())
+ .isGreaterThanOrEqualTo(beforeCreateTime)
+ .isLessThanOrEqualTo(afterCreateTime);
+ }
}