This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 303af6d91 [lake] Add metric of LAKE_TABLE_COUNT (#2528)
303af6d91 is described below
commit 303af6d91fbc69ee28ec18e18a19409a84a93d08
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Feb 2 21:05:05 2026 +0800
[lake] Add metric of LAKE_TABLE_COUNT (#2528)
---
.../java/org/apache/fluss/metrics/MetricNames.java | 1 +
.../server/coordinator/CoordinatorContext.java | 15 ++++
.../coordinator/event/CoordinatorEventManager.java | 8 ++
.../server/coordinator/CoordinatorContextTest.java | 88 ++++++++++++++++++++++
4 files changed, 112 insertions(+)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index 454b75bdc..7f4ae5a03 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -39,6 +39,7 @@ public class MetricNames {
public static final String ACTIVE_TABLET_SERVER_COUNT =
"activeTabletServerCount";
public static final String OFFLINE_BUCKET_COUNT = "offlineBucketCount";
public static final String TABLE_COUNT = "tableCount";
+ public static final String LAKE_TABLE_COUNT = "lakeTableCount";
public static final String BUCKET_COUNT = "bucketCount";
public static final String PARTITION_COUNT = "partitionCount";
public static final String REPLICAS_TO_DELETE_COUNT =
"replicasToDeleteCount";
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
index cd55d55b1..bece7d9dc 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
@@ -184,6 +184,21 @@ public class CoordinatorContext {
return tablePathById;
}
+ /**
+ * Returns the number of lake tables (tables with datalake enabled) in the
cluster.
+ *
+ * @return the count of lake tables
+ */
+ public int getLakeTableCount() {
+ int count = 0;
+ for (TableInfo tableInfo : tableInfoById.values()) {
+ if (tableInfo.getTableConfig().isDataLakeEnabled()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
public Set<TableBucket> getAllBuckets() {
Set<TableBucket> allBuckets = new HashSet<>();
for (Map.Entry<Long, Map<Integer, List<Integer>>> tableAssign :
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
index c32b71359..ad22f6d1c 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
@@ -65,6 +65,7 @@ public final class CoordinatorEventManager implements
EventManager {
private volatile int tabletServerCount;
private volatile int offlineBucketCount;
private volatile int tableCount;
+ private volatile int lakeTableCount;
private volatile int bucketCount;
private volatile int partitionCount;
private volatile int replicasToDeleteCount;
@@ -92,6 +93,7 @@ public final class CoordinatorEventManager implements
EventManager {
coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () ->
offlineBucketCount);
coordinatorMetricGroup.gauge(MetricNames.BUCKET_COUNT, () ->
bucketCount);
coordinatorMetricGroup.gauge(MetricNames.TABLE_COUNT, () ->
tableCount);
+ coordinatorMetricGroup.gauge(MetricNames.LAKE_TABLE_COUNT, () ->
lakeTableCount);
coordinatorMetricGroup.gauge(MetricNames.PARTITION_COUNT, () ->
partitionCount);
coordinatorMetricGroup.gauge(
MetricNames.REPLICAS_TO_DELETE_COUNT, () ->
replicasToDeleteCount);
@@ -105,6 +107,7 @@ public final class CoordinatorEventManager implements
EventManager {
context -> {
int tabletServerCount =
context.getLiveTabletServers().size();
int tableCount = context.allTables().size();
+ int lakeTableCount = context.getLakeTableCount();
int bucketCount =
context.bucketLeaderAndIsr().size();
int partitionCount =
context.getTotalPartitionCount();
int offlineBucketCount =
context.getOfflineBucketCount();
@@ -137,6 +140,7 @@ public final class CoordinatorEventManager implements
EventManager {
return new MetricsData(
tabletServerCount,
tableCount,
+ lakeTableCount,
bucketCount,
partitionCount,
offlineBucketCount,
@@ -150,6 +154,7 @@ public final class CoordinatorEventManager implements
EventManager {
MetricsData metricsData =
accessContextEvent.getResultFuture().get();
this.tabletServerCount = metricsData.tabletServerCount;
this.tableCount = metricsData.tableCount;
+ this.lakeTableCount = metricsData.lakeTableCount;
this.bucketCount = metricsData.bucketCount;
this.partitionCount = metricsData.partitionCount;
this.offlineBucketCount = metricsData.offlineBucketCount;
@@ -272,6 +277,7 @@ public final class CoordinatorEventManager implements
EventManager {
private static class MetricsData {
private final int tabletServerCount;
private final int tableCount;
+ private final int lakeTableCount;
private final int bucketCount;
private final int partitionCount;
private final int offlineBucketCount;
@@ -280,12 +286,14 @@ public final class CoordinatorEventManager implements
EventManager {
public MetricsData(
int tabletServerCount,
int tableCount,
+ int lakeTableCount,
int bucketCount,
int partitionCount,
int offlineBucketCount,
int replicasToDeleteCount) {
this.tabletServerCount = tabletServerCount;
this.tableCount = tableCount;
+ this.lakeTableCount = lakeTableCount;
this.bucketCount = bucketCount;
this.partitionCount = partitionCount;
this.offlineBucketCount = offlineBucketCount;
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java
new file mode 100644
index 000000000..778ac45e1
--- /dev/null
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CoordinatorContext}. */
+class CoordinatorContextTest {
+
+ @Test
+ void testGetLakeTableCount() {
+ CoordinatorContext context = new CoordinatorContext();
+
+ // Initially, there should be no tables
+ assertThat(context.allTables()).isEmpty();
+ assertThat(context.getLakeTableCount()).isEqualTo(0);
+
+ // Add a non-lake table
+ TableInfo nonLakeTable = createTableInfo(1L, TablePath.of("db1",
"table1"), false);
+ context.putTablePath(1L, nonLakeTable.getTablePath());
+ context.putTableInfo(nonLakeTable);
+
+ assertThat(context.allTables()).hasSize(1);
+ assertThat(context.getLakeTableCount()).isEqualTo(0);
+
+ // Add a lake table
+ TableInfo lakeTable = createTableInfo(2L, TablePath.of("db1",
"table2"), true);
+ context.putTablePath(2L, lakeTable.getTablePath());
+ context.putTableInfo(lakeTable);
+
+ assertThat(context.allTables()).hasSize(2);
+ assertThat(context.getLakeTableCount()).isEqualTo(1);
+
+ // Add another lake table
+ TableInfo lakeTable2 = createTableInfo(3L, TablePath.of("db2",
"table3"), true);
+ context.putTablePath(3L, lakeTable2.getTablePath());
+ context.putTableInfo(lakeTable2);
+
+ assertThat(context.allTables()).hasSize(3);
+ assertThat(context.getLakeTableCount()).isEqualTo(2);
+ }
+
+ private TableInfo createTableInfo(long tableId, TablePath tablePath,
boolean isLake) {
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(Schema.newBuilder().column("f1",
DataTypes.INT()).build())
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ZERO)
+ .property(TABLE_DATALAKE_ENABLED, isLake)
+ .distributedBy(1)
+ .build();
+
+ return TableInfo.of(
+ tablePath,
+ tableId,
+ 1,
+ tableDescriptor,
+ System.currentTimeMillis(),
+ System.currentTimeMillis());
+ }
+}