This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 303af6d91 [lake] Add metric of LAKE_TABLE_COUNT (#2528)
303af6d91 is described below

commit 303af6d91fbc69ee28ec18e18a19409a84a93d08
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Feb 2 21:05:05 2026 +0800

    [lake] Add metric of LAKE_TABLE_COUNT (#2528)
---
 .../java/org/apache/fluss/metrics/MetricNames.java |  1 +
 .../server/coordinator/CoordinatorContext.java     | 15 ++++
 .../coordinator/event/CoordinatorEventManager.java |  8 ++
 .../server/coordinator/CoordinatorContextTest.java | 88 ++++++++++++++++++++++
 4 files changed, 112 insertions(+)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index 454b75bdc..7f4ae5a03 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -39,6 +39,7 @@ public class MetricNames {
     public static final String ACTIVE_TABLET_SERVER_COUNT = 
"activeTabletServerCount";
     public static final String OFFLINE_BUCKET_COUNT = "offlineBucketCount";
     public static final String TABLE_COUNT = "tableCount";
+    public static final String LAKE_TABLE_COUNT = "lakeTableCount";
     public static final String BUCKET_COUNT = "bucketCount";
     public static final String PARTITION_COUNT = "partitionCount";
     public static final String REPLICAS_TO_DELETE_COUNT = 
"replicasToDeleteCount";
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
index cd55d55b1..bece7d9dc 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
@@ -184,6 +184,21 @@ public class CoordinatorContext {
         return tablePathById;
     }
 
+    /**
+     * Returns the number of lake tables (tables with datalake enabled) in the 
cluster.
+     *
+     * @return the count of lake tables
+     */
+    public int getLakeTableCount() {
+        int count = 0;
+        for (TableInfo tableInfo : tableInfoById.values()) {
+            if (tableInfo.getTableConfig().isDataLakeEnabled()) {
+                count++;
+            }
+        }
+        return count;
+    }
+
     public Set<TableBucket> getAllBuckets() {
         Set<TableBucket> allBuckets = new HashSet<>();
         for (Map.Entry<Long, Map<Integer, List<Integer>>> tableAssign :
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
index c32b71359..ad22f6d1c 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
@@ -65,6 +65,7 @@ public final class CoordinatorEventManager implements 
EventManager {
     private volatile int tabletServerCount;
     private volatile int offlineBucketCount;
     private volatile int tableCount;
+    private volatile int lakeTableCount;
     private volatile int bucketCount;
     private volatile int partitionCount;
     private volatile int replicasToDeleteCount;
@@ -92,6 +93,7 @@ public final class CoordinatorEventManager implements 
EventManager {
         coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () -> 
offlineBucketCount);
         coordinatorMetricGroup.gauge(MetricNames.BUCKET_COUNT, () -> 
bucketCount);
         coordinatorMetricGroup.gauge(MetricNames.TABLE_COUNT, () -> 
tableCount);
+        coordinatorMetricGroup.gauge(MetricNames.LAKE_TABLE_COUNT, () -> 
lakeTableCount);
         coordinatorMetricGroup.gauge(MetricNames.PARTITION_COUNT, () -> 
partitionCount);
         coordinatorMetricGroup.gauge(
                 MetricNames.REPLICAS_TO_DELETE_COUNT, () -> 
replicasToDeleteCount);
@@ -105,6 +107,7 @@ public final class CoordinatorEventManager implements 
EventManager {
                         context -> {
                             int tabletServerCount = 
context.getLiveTabletServers().size();
                             int tableCount = context.allTables().size();
+                            int lakeTableCount = context.getLakeTableCount();
                             int bucketCount = 
context.bucketLeaderAndIsr().size();
                             int partitionCount = 
context.getTotalPartitionCount();
                             int offlineBucketCount = 
context.getOfflineBucketCount();
@@ -137,6 +140,7 @@ public final class CoordinatorEventManager implements 
EventManager {
                             return new MetricsData(
                                     tabletServerCount,
                                     tableCount,
+                                    lakeTableCount,
                                     bucketCount,
                                     partitionCount,
                                     offlineBucketCount,
@@ -150,6 +154,7 @@ public final class CoordinatorEventManager implements 
EventManager {
             MetricsData metricsData = 
accessContextEvent.getResultFuture().get();
             this.tabletServerCount = metricsData.tabletServerCount;
             this.tableCount = metricsData.tableCount;
+            this.lakeTableCount = metricsData.lakeTableCount;
             this.bucketCount = metricsData.bucketCount;
             this.partitionCount = metricsData.partitionCount;
             this.offlineBucketCount = metricsData.offlineBucketCount;
@@ -272,6 +277,7 @@ public final class CoordinatorEventManager implements 
EventManager {
     private static class MetricsData {
         private final int tabletServerCount;
         private final int tableCount;
+        private final int lakeTableCount;
         private final int bucketCount;
         private final int partitionCount;
         private final int offlineBucketCount;
@@ -280,12 +286,14 @@ public final class CoordinatorEventManager implements 
EventManager {
         public MetricsData(
                 int tabletServerCount,
                 int tableCount,
+                int lakeTableCount,
                 int bucketCount,
                 int partitionCount,
                 int offlineBucketCount,
                 int replicasToDeleteCount) {
             this.tabletServerCount = tabletServerCount;
             this.tableCount = tableCount;
+            this.lakeTableCount = lakeTableCount;
             this.bucketCount = bucketCount;
             this.partitionCount = partitionCount;
             this.offlineBucketCount = offlineBucketCount;
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java
new file mode 100644
index 000000000..778ac45e1
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CoordinatorContext}. */
+class CoordinatorContextTest {
+
+    @Test
+    void testGetLakeTableCount() {
+        CoordinatorContext context = new CoordinatorContext();
+
+        // Initially, there should be no tables
+        assertThat(context.allTables()).isEmpty();
+        assertThat(context.getLakeTableCount()).isEqualTo(0);
+
+        // Add a non-lake table
+        TableInfo nonLakeTable = createTableInfo(1L, TablePath.of("db1", 
"table1"), false);
+        context.putTablePath(1L, nonLakeTable.getTablePath());
+        context.putTableInfo(nonLakeTable);
+
+        assertThat(context.allTables()).hasSize(1);
+        assertThat(context.getLakeTableCount()).isEqualTo(0);
+
+        // Add a lake table
+        TableInfo lakeTable = createTableInfo(2L, TablePath.of("db1", 
"table2"), true);
+        context.putTablePath(2L, lakeTable.getTablePath());
+        context.putTableInfo(lakeTable);
+
+        assertThat(context.allTables()).hasSize(2);
+        assertThat(context.getLakeTableCount()).isEqualTo(1);
+
+        // Add another lake table
+        TableInfo lakeTable2 = createTableInfo(3L, TablePath.of("db2", 
"table3"), true);
+        context.putTablePath(3L, lakeTable2.getTablePath());
+        context.putTableInfo(lakeTable2);
+
+        assertThat(context.allTables()).hasSize(3);
+        assertThat(context.getLakeTableCount()).isEqualTo(2);
+    }
+
+    private TableInfo createTableInfo(long tableId, TablePath tablePath, 
boolean isLake) {
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(Schema.newBuilder().column("f1", 
DataTypes.INT()).build())
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ZERO)
+                        .property(TABLE_DATALAKE_ENABLED, isLake)
+                        .distributedBy(1)
+                        .build();
+
+        return TableInfo.of(
+                tablePath,
+                tableId,
+                1,
+                tableDescriptor,
+                System.currentTimeMillis(),
+                System.currentTimeMillis());
+    }
+}

Reply via email to