This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 6655bdcf2 [client] Support list database summaries (#2515)
6655bdcf2 is described below

commit 6655bdcf2c9c933c7c492016a7708c06301ae8f2
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Feb 5 16:57:00 2026 +0800

    [client] Support list database summaries (#2515)
---
 .../java/org/apache/fluss/client/admin/Admin.java  |  15 +++
 .../org/apache/fluss/client/admin/FlussAdmin.java  |   9 ++
 .../fluss/client/utils/ClientRpcMessageUtils.java  |  23 +++++
 .../fluss/client/admin/FlussAdminITCase.java       |  17 ++++
 .../org/apache/fluss/metadata/DatabaseSummary.java | 104 +++++++++++++++++++++
 fluss-rpc/src/main/proto/FlussApi.proto            |   9 ++
 .../org/apache/fluss/server/RpcServiceBase.java    |   9 +-
 .../ZkNodeChangeNotificationWatcher.java           |   4 +-
 .../fluss/server/coordinator/MetadataManager.java  |   7 ++
 .../fluss/server/utils/ServerRpcMessageUtils.java  |  14 +++
 .../org/apache/fluss/server/zk/ZkAsyncRequest.java |   7 ++
 .../apache/fluss/server/zk/ZkAsyncResponse.java    |  32 +++++++
 .../apache/fluss/server/zk/ZooKeeperClient.java    |  50 +++++++++-
 .../org/apache/fluss/server/ServerTestBase.java    |   5 +-
 .../ZkNodeChangeNotificationWatcherTest.java       |   2 +-
 .../fluss/server/zk/ZooKeeperClientTest.java       |  37 ++++++++
 16 files changed, 335 insertions(+), 9 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
index 6c0d6876e..b6e9ea1d6 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
@@ -55,6 +55,7 @@ import org.apache.fluss.exception.TooManyBucketsException;
 import org.apache.fluss.exception.TooManyPartitionsException;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
+import org.apache.fluss.metadata.DatabaseSummary;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
@@ -179,6 +180,20 @@ public interface Admin extends AutoCloseable {
     /** List all databases in fluss cluster asynchronously. */
     CompletableFuture<List<String>> listDatabases();
 
+    /**
+     * List all databases' summary information in fluss cluster 
asynchronously. The difference
+     * between this method and {@link #listDatabases()} is that this method 
also include some
+     * summaries for the database, like {@link 
DatabaseSummary#getCreatedTime()} and {@link
+     * DatabaseSummary#getTableCount()}.
+     *
+     * <p>When interacting older version of fluss cluster which does not 
support this API, it will
+     * fall back to {@link #listDatabases()} with {@code -1} value for {@link
+     * DatabaseSummary#getCreatedTime()} and {@link 
DatabaseSummary#getTableCount()}.
+     *
+     * @since 0.9
+     */
+    CompletableFuture<List<DatabaseSummary>> listDatabaseSummaries();
+
     /**
      * Create a new table asynchronously.
      *
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 543d65128..9df4bf248 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -33,6 +33,7 @@ import org.apache.fluss.config.cluster.ConfigEntry;
 import org.apache.fluss.exception.LeaderNotAvailableException;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
+import org.apache.fluss.metadata.DatabaseSummary;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.PhysicalTablePath;
@@ -245,6 +246,14 @@ public class FlussAdmin implements Admin {
                 .thenApply(ListDatabasesResponse::getDatabaseNamesList);
     }
 
+    @Override
+    public CompletableFuture<List<DatabaseSummary>> listDatabaseSummaries() {
+        ListDatabasesRequest request = new 
ListDatabasesRequest().setIncludeSummary(true);
+        return readOnlyGateway
+                .listDatabases(request)
+                .thenApply(ClientRpcMessageUtils::toDatabaseSummaries);
+    }
+
     @Override
     public CompletableFuture<Void> createTable(
             TablePath tablePath, TableDescriptor tableDescriptor, boolean 
ignoreIfExists) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
index 68d8712f1..75cafb491 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
@@ -36,6 +36,7 @@ import org.apache.fluss.config.cluster.ConfigEntry;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.fs.FsPathAndFileName;
 import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.metadata.DatabaseSummary;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.PhysicalTablePath;
@@ -50,6 +51,7 @@ import 
org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
 import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
 import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
 import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse;
+import org.apache.fluss.rpc.messages.ListDatabasesResponse;
 import org.apache.fluss.rpc.messages.ListOffsetsRequest;
 import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
 import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
@@ -58,6 +60,7 @@ import org.apache.fluss.rpc.messages.MetadataRequest;
 import org.apache.fluss.rpc.messages.PbAddColumn;
 import org.apache.fluss.rpc.messages.PbAlterConfig;
 import org.apache.fluss.rpc.messages.PbBucketOffset;
+import org.apache.fluss.rpc.messages.PbDatabaseSummary;
 import org.apache.fluss.rpc.messages.PbDescribeConfig;
 import org.apache.fluss.rpc.messages.PbDropColumn;
 import org.apache.fluss.rpc.messages.PbKeyValue;
@@ -646,4 +649,24 @@ public class ClientRpcMessageUtils {
         long expirationTime = response.hasExpirationTime() ? 
response.getExpirationTime() : 0;
         return new ProducerOffsetsResult(response.getProducerId(), 
tableOffsets, expirationTime);
     }
+
+    public static List<DatabaseSummary> 
toDatabaseSummaries(ListDatabasesResponse response) {
+        List<DatabaseSummary> databaseSummaries = new ArrayList<>();
+        for (PbDatabaseSummary pbDatabaseSummary : 
response.getDatabaseSummariesList()) {
+            databaseSummaries.add(
+                    new DatabaseSummary(
+                            pbDatabaseSummary.getDatabaseName(),
+                            pbDatabaseSummary.getCreatedTime(),
+                            pbDatabaseSummary.getTableCount()));
+        }
+
+        if (response.getDatabaseNamesCount() > 0 && 
response.getDatabaseSummariesCount() == 0) {
+            // backward-compatibility for older server versions that only 
returns database names
+            for (String dbName : response.getDatabaseNamesList()) {
+                databaseSummaries.add(new DatabaseSummary(dbName, -1L, -1));
+            }
+        }
+
+        return databaseSummaries;
+    }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 3d708035f..9936f9430 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -55,6 +55,7 @@ import org.apache.fluss.fs.FsPathAndFileName;
 import org.apache.fluss.metadata.AggFunctions;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
+import org.apache.fluss.metadata.DatabaseSummary;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
@@ -874,11 +875,27 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
         admin.createDatabase("db3", DatabaseDescriptor.EMPTY, true).get();
         assertThat(admin.listDatabases().get())
                 .containsExactlyInAnyOrder("test_db", "db1", "db2", "db3", 
"fluss");
+        Map<String, Integer> databaseSummaries =
+                admin.listDatabaseSummaries().get().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        DatabaseSummary::getDatabaseName,
+                                        DatabaseSummary::getTableCount));
+        assertThat(databaseSummaries.get("db1")).isEqualTo(0);
+        assertThat(databaseSummaries.get("db2")).isEqualTo(0);
 
         admin.createTable(TablePath.of("db1", "table1"), 
DEFAULT_TABLE_DESCRIPTOR, true).get();
         admin.createTable(TablePath.of("db1", "table2"), 
DEFAULT_TABLE_DESCRIPTOR, true).get();
         
assertThat(admin.listTables("db1").get()).containsExactlyInAnyOrder("table1", 
"table2");
         assertThat(admin.listTables("db2").get()).isEmpty();
+        databaseSummaries =
+                admin.listDatabaseSummaries().get().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        DatabaseSummary::getDatabaseName,
+                                        DatabaseSummary::getTableCount));
+        assertThat(databaseSummaries.get("db1")).isEqualTo(1);
+        assertThat(databaseSummaries.get("db2")).isEqualTo(0);
 
         assertThatThrownBy(() -> admin.listTables("unknown_db").get())
                 .cause()
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseSummary.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseSummary.java
new file mode 100644
index 000000000..997dacfad
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseSummary.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+import java.util.Objects;
+
+/**
+ * Aggregated summary information of a database for listing purposes.
+ *
+ * <p>This class contains aggregated metadata about a database, including 
creation time, table
+ * count, and other summary statistics. It is distinct from {@link 
DatabaseInfo} which contains
+ * complete database metadata including the {@link DatabaseDescriptor}.
+ *
+ * @since 0.6
+ */
+@PublicEvolving
+public class DatabaseSummary {
+    private final String databaseName;
+    private final long createdTime;
+    private final int tableCount;
+
+    public DatabaseSummary(String databaseName, long createdTime, int 
tableCount) {
+        this.databaseName = databaseName;
+        this.createdTime = createdTime;
+        this.tableCount = tableCount;
+    }
+
+    /**
+     * Returns the name of the database.
+     *
+     * @return the database name
+     */
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    /**
+     * Returns the creation time of the database in milliseconds since epoch.
+     *
+     * @return the creation timestamp
+     */
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+    /**
+     * Returns the number of tables in this database.
+     *
+     * @return the table count
+     */
+    public int getTableCount() {
+        return tableCount;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DatabaseSummary that = (DatabaseSummary) o;
+        return Objects.equals(createdTime, that.createdTime)
+                && Objects.equals(tableCount, that.tableCount)
+                && Objects.equals(databaseName, that.databaseName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(databaseName, createdTime, tableCount);
+    }
+
+    @Override
+    public String toString() {
+        return "DatabaseSummary{"
+                + "databaseName='"
+                + databaseName
+                + '\''
+                + ", createdTime="
+                + createdTime
+                + ", tableCount="
+                + tableCount
+                + '\''
+                + '}';
+    }
+}
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index ea2a832ba..99cdf3238 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -93,9 +93,12 @@ message DatabaseExistsResponse {
 
 // list databases request and response
 message ListDatabasesRequest {
+  optional bool include_summary = 1;
 }
+
 message ListDatabasesResponse {
   repeated string database_name = 1;
+  repeated PbDatabaseSummary database_summary = 2;
 }
 
 // create table request and response
@@ -1129,4 +1132,10 @@ message PbBucketOffset {
 message PbProducerTableOffsets {
   required int64 table_id = 1;
   repeated PbBucketOffset bucket_offsets = 2;
+}
+
+message PbDatabaseSummary {
+  required string database_name = 1;
+  required int64 created_time = 2;
+  required int32 table_count = 3;
 }
\ No newline at end of file
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java 
b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
index 9e2bcbf16..74b3ae585 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
@@ -122,6 +122,7 @@ import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeListAclsRe
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toGetFileSystemSecurityTokenResponse;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toListPartitionInfosResponse;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toPbConfigEntries;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toPbDatabaseSummary;
 import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
 import static org.apache.fluss.utils.Preconditions.checkState;
 
@@ -217,7 +218,13 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
                     
authorizedDatabase.stream().map(Resource::getName).collect(Collectors.toList());
         }
 
-        response.addAllDatabaseNames(databaseNames);
+        if (request.hasIncludeSummary() && request.isIncludeSummary()) {
+            response.addAllDatabaseSummaries(
+                    
toPbDatabaseSummary(metadataManager.listDatabaseSummaries(databaseNames)));
+        } else {
+            response.addAllDatabaseNames(databaseNames);
+        }
+
         return CompletableFuture.completedFuture(response);
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcher.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcher.java
index 136673fb7..31e6666ac 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcher.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcher.java
@@ -139,8 +139,8 @@ public class ZkNodeChangeNotificationWatcher {
         for (String notification : sortedNotifications) {
             String notificationNode = seqNodeRoot + "/" + notification;
             try {
-                Optional<Stat> state = 
zooKeeperClient.getStat(notificationNode);
-                if (state.isPresent() && now - state.get().getCtime() >= 
changeExpirationMs) {
+                Stat state = zooKeeperClient.getStat(notificationNode).get();
+                if (now - state.getCtime() >= changeExpirationMs) {
                     LOG.debug("Purging change notification {}", 
notificationNode);
                     zooKeeperClient.deletePath(notificationNode);
                 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 02a02cc44..c9537136f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -37,6 +37,7 @@ import org.apache.fluss.exception.TooManyPartitionsException;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
+import org.apache.fluss.metadata.DatabaseSummary;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.SchemaInfo;
@@ -167,6 +168,12 @@ public class MetadataManager {
         return uncheck(zookeeperClient::listDatabases, "Fail to list 
database");
     }
 
+    public List<DatabaseSummary> listDatabaseSummaries(Collection<String> 
databaseNames) {
+        return uncheck(
+                () -> zookeeperClient.listDatabaseSummaries(databaseNames),
+                "Fail to get database summaries for " + databaseNames);
+    }
+
     public List<String> listTables(String databaseName) throws 
DatabaseNotExistException {
         if (!databaseExists(databaseName)) {
             throw new DatabaseNotExistException("Database " + databaseName + " 
does not exist.");
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index a4bfa01c5..3a8dd5c24 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -29,6 +29,7 @@ import org.apache.fluss.config.cluster.ColumnPositionType;
 import org.apache.fluss.config.cluster.ConfigEntry;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.metadata.DatabaseSummary;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
@@ -95,6 +96,7 @@ import org.apache.fluss.rpc.messages.PbAlterConfig;
 import org.apache.fluss.rpc.messages.PbBucketMetadata;
 import org.apache.fluss.rpc.messages.PbBucketOffset;
 import org.apache.fluss.rpc.messages.PbCreateAclRespInfo;
+import org.apache.fluss.rpc.messages.PbDatabaseSummary;
 import org.apache.fluss.rpc.messages.PbDescribeConfig;
 import org.apache.fluss.rpc.messages.PbDropAclsFilterResult;
 import org.apache.fluss.rpc.messages.PbDropAclsMatchingAcl;
@@ -1889,6 +1891,18 @@ public class ServerRpcMessageUtils {
                 .collect(Collectors.toList());
     }
 
+    public static List<PbDatabaseSummary> toPbDatabaseSummary(
+            List<DatabaseSummary> databaseSummaries) {
+        return databaseSummaries.stream()
+                .map(
+                        databaseSummary ->
+                                new PbDatabaseSummary()
+                                        
.setDatabaseName(databaseSummary.getDatabaseName())
+                                        
.setCreatedTime(databaseSummary.getCreatedTime())
+                                        
.setTableCount(databaseSummary.getTableCount()))
+                .collect(Collectors.toList());
+    }
+
     public static RebalanceResponse makeRebalanceResponse(String rebalanceId) {
         return new RebalanceResponse().setRebalanceId(rebalanceId);
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java
index 9163bd1b6..013731dbd 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java
@@ -45,4 +45,11 @@ public abstract class ZkAsyncRequest {
             super(path);
         }
     }
+
+    /** The request for ZooKeeper checkExists async operation. */
+    public static class ZkCheckExistsRequest extends ZkAsyncRequest {
+        protected ZkCheckExistsRequest(String path) {
+            super(path);
+        }
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java
index 900939f44..627520146 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.zk;
 import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.CuratorEvent;
 import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
 import 
org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException.Code;
+import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
 
 import java.util.List;
 import java.util.Optional;
@@ -58,6 +59,18 @@ public abstract class ZkAsyncResponse {
         }
     }
 
+    /** Returns a string representation of the error message, or empty string 
if none. */
+    public String getErrorMessage() {
+        return resultException()
+                .map(e -> e.getClass().getSimpleName() + ": " + e.getMessage())
+                .orElse("");
+    }
+
+    /** Returns true if the response indicates an error. */
+    public boolean hasError() {
+        return resultCode != Code.OK;
+    }
+
     // 
-------------------------------------------------------------------------------------------
 
     /** The response for ZooKeeper getData async operation. */
@@ -100,4 +113,23 @@ public abstract class ZkAsyncResponse {
                     event.getPath(), Code.get(event.getResultCode()), 
event.getChildren());
         }
     }
+
+    /** The response for ZooKeeper checkExists async operation. */
+    public static class ZkCheckExistsResponse extends ZkAsyncResponse {
+        private final Stat stat;
+
+        public ZkCheckExistsResponse(String path, KeeperException.Code 
resultCode, Stat stat) {
+            super(path, resultCode);
+            this.stat = stat;
+        }
+
+        public Stat getStat() {
+            return stat;
+        }
+
+        public static ZkCheckExistsResponse create(CuratorEvent event) {
+            return new ZkCheckExistsResponse(
+                    event.getPath(), Code.get(event.getResultCode()), 
event.getStat());
+        }
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index 4d9bf283d..86f3b952f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseSummary;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.Schema;
@@ -34,8 +35,10 @@ import org.apache.fluss.security.acl.ResourceType;
 import org.apache.fluss.server.authorizer.DefaultAuthorizer.VersionedAcls;
 import org.apache.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo;
 import org.apache.fluss.server.metadata.BucketMetadata;
+import org.apache.fluss.server.zk.ZkAsyncRequest.ZkCheckExistsRequest;
 import org.apache.fluss.server.zk.ZkAsyncRequest.ZkGetChildrenRequest;
 import org.apache.fluss.server.zk.ZkAsyncRequest.ZkGetDataRequest;
+import org.apache.fluss.server.zk.ZkAsyncResponse.ZkCheckExistsResponse;
 import org.apache.fluss.server.zk.ZkAsyncResponse.ZkGetChildrenResponse;
 import org.apache.fluss.server.zk.ZkAsyncResponse.ZkGetDataResponse;
 import org.apache.fluss.server.zk.data.BucketSnapshot;
@@ -487,6 +490,36 @@ public class ZooKeeperClient implements AutoCloseable {
         return getChildren(DatabasesZNode.path());
     }
 
+    public List<DatabaseSummary> listDatabaseSummaries(Collection<String> 
databaseNames)
+            throws Exception {
+        Map<String, String> path2DatabaseNamesMap =
+                databaseNames.stream()
+                        .collect(toMap(DatabaseZNode::path, databaseName -> 
databaseName));
+        List<ZkCheckExistsResponse> statsInBackground =
+                getStatInBackground(path2DatabaseNamesMap.keySet());
+        List<DatabaseSummary> databaseSummaries = new ArrayList<>();
+        for (ZkCheckExistsResponse response : statsInBackground) {
+            Stat stat = response.getStat();
+            if (!response.hasError() && stat != null) {
+                // To decrease the cost, use zk node creation time as the 
database creation
+                // time rather than create_time in node data.
+                databaseSummaries.add(
+                        new DatabaseSummary(
+                                path2DatabaseNamesMap.get(response.getPath()),
+                                response.getStat().getCtime(),
+                                response.getStat().getNumChildren()));
+            } else {
+                // silently ignore the database which does not exist anymore,
+                // because the database names are listed by server not user
+                LOG.warn(
+                        "Failed to get database summary for database {}. {}",
+                        path2DatabaseNamesMap.get(response.getPath()),
+                        response.getErrorMessage());
+            }
+        }
+        return databaseSummaries;
+    }
+
     public List<String> listTables(String databaseName) throws Exception {
         return getChildren(TablesZNode.path(databaseName));
     }
@@ -1426,7 +1459,8 @@ public class ZooKeeperClient implements AutoCloseable {
 
                     } else if (request instanceof ZkGetChildrenRequest) {
                         
zkClient.getChildren().inBackground(callback).forPath(request.getPath());
-
+                    } else if (request instanceof ZkCheckExistsRequest) {
+                        
zkClient.checkExists().inBackground(callback).forPath(request.getPath());
                     } else {
                         throw new IllegalArgumentException(
                                 "Unsupported request type: " + 
request.getClass());
@@ -1498,6 +1532,20 @@ public class ZooKeeperClient implements AutoCloseable {
         return handleRequestInBackground(requests, ZkGetDataResponse::create);
     }
 
+    /**
+     * Gets the stat of given zk node paths in background.
+     *
+     * @param paths the paths to fetch stat
+     * @return list of async responses for each path
+     * @throws Exception if there is an error during the operation
+     */
+    private List<ZkCheckExistsResponse> getStatInBackground(Collection<String> 
paths)
+            throws Exception {
+        List<ZkCheckExistsRequest> requests =
+                
paths.stream().map(ZkCheckExistsRequest::new).collect(Collectors.toList());
+        return handleRequestInBackground(requests, 
ZkCheckExistsResponse::create);
+    }
+
     /**
      * Send a pipelined sequence of requests and wait for all of their 
responses synchronously in
      * background.
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java 
b/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java
index c97cd4e99..2504aebed 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java
@@ -36,7 +36,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
 import java.util.List;
-import java.util.Optional;
 
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -104,9 +103,7 @@ public abstract class ServerTestBase {
         retry(
                 Duration.ofMinutes(2),
                 () -> {
-                    Optional<Stat> optionalStat = 
zookeeperClient.getStat(path);
-                    assertThat(optionalStat).isPresent();
-                    Stat stat = optionalStat.get();
+                    Stat stat = zookeeperClient.getStat(path).get();
                     assertThat(stat.getCtime()).isGreaterThan(oldNodeCtime);
                 });
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcherTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcherTest.java
index 641fb5c12..d43784c1a 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcherTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/authorizer/ZkNodeChangeNotificationWatcherTest.java
@@ -112,7 +112,7 @@ public class ZkNodeChangeNotificationWatcherTest {
         clock.advanceTime(
                 maxCtimeBeforeStart - startTime + 
Duration.ofMinutes(5).toMillis(),
                 TimeUnit.MILLISECONDS);
-        // Insert a new notification to trigger the purging of obsolete 
notificatios.
+        // Insert a new notification to trigger the purging of obsolete 
notifications.
         zookeeperClient.insertAclChangeNotification(newNoticedResource);
         retry(
                 Duration.ofMinutes(1),
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
index f6caffd19..94cb34b23 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
@@ -23,6 +23,7 @@ import 
org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
 import org.apache.fluss.cluster.rebalance.ServerTag;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseSummary;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableBucket;
@@ -667,4 +668,40 @@ class ZooKeeperClientTest {
                     .isEqualTo("zookeeper2");
         }
     }
+
+    @Test
+    void testGetDatabaseSummary() throws Exception {
+        TablePath tablePath = TablePath.of("db", "tb1");
+
+        assertThat(
+                        zookeeperClient.listDatabaseSummaries(
+                                
Collections.singletonList(tablePath.getDatabaseName())))
+                .isEmpty();
+
+        // register table.
+        long beforeCreateTime = System.currentTimeMillis();
+        TableRegistration tableReg1 =
+                new TableRegistration(
+                        11,
+                        "first table",
+                        Arrays.asList("a", "b"),
+                        new TableDescriptor.TableDistribution(16, 
Collections.singletonList("a")),
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        beforeCreateTime,
+                        beforeCreateTime);
+        zookeeperClient.registerTable(tablePath, tableReg1);
+
+        long afterCreateTime = System.currentTimeMillis();
+        List<DatabaseSummary> databaseSummaries =
+                zookeeperClient.listDatabaseSummaries(
+                        
Collections.singletonList(tablePath.getDatabaseName()));
+        assertThat(databaseSummaries).hasSize(1);
+        DatabaseSummary databaseSummary = databaseSummaries.get(0);
+        assertThat(databaseSummary.getDatabaseName()).isEqualTo("db");
+        assertThat(databaseSummary.getTableCount()).isEqualTo(1);
+        assertThat(databaseSummary.getCreatedTime())
+                .isGreaterThanOrEqualTo(beforeCreateTime)
+                .isLessThanOrEqualTo(afterCreateTime);
+    }
 }

Reply via email to