This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 459c763c3 [metric] Add user-level metrics for byteIn and byteOut 
(#2080)
459c763c3 is described below

commit 459c763c3787a980d2c6cf663bb433c0c6bade75
Author: xiaozhou <[email protected]>
AuthorDate: Sun Dec 21 19:13:40 2025 +0800

    [metric] Add user-level metrics for byteIn and byteOut (#2080)
    
    Co-authored-by: Jark Wu <[email protected]>
---
 .../java/org/apache/fluss/metrics/MetricNames.java |   6 +
 .../apache/fluss/server/entity/UserContext.java    |  36 ++++
 .../apache/fluss/server/metrics/UserMetrics.java   | 231 +++++++++++++++++++++
 .../metrics/group/AbstractUserMetricGroup.java     |  90 ++++++++
 .../server/metrics/group/UserMetricGroup.java      |  42 ++++
 .../metrics/group/UserPerTableMetricGroup.java     |  58 ++++++
 .../fluss/server/replica/ReplicaManager.java       |  37 +++-
 .../server/replica/delay/DelayedFetchLog.java      |  10 +-
 .../apache/fluss/server/tablet/TabletServer.java   |  16 +-
 .../apache/fluss/server/tablet/TabletService.java  |   3 +
 .../fluss/server/log/remote/RemoteLogITCase.java   |   1 +
 .../server/log/remote/RemoteLogManagerTest.java    |   6 +
 .../fluss/server/log/remote/RemoteLogTTLTest.java  |   1 +
 .../fluss/server/metrics/UserMetricsTest.java      | 128 ++++++++++++
 .../server/metrics/group/TestingMetricGroups.java  |  31 +++
 .../apache/fluss/server/replica/AdjustIsrTest.java |   3 +
 .../fluss/server/replica/ReplicaManagerTest.java   |  38 +++-
 .../fluss/server/replica/ReplicaTestBase.java      |   1 +
 .../server/replica/delay/DelayedFetchLogTest.java  |   4 +-
 .../replica/fetcher/ReplicaFetcherITCase.java      |   2 +
 .../replica/fetcher/ReplicaFetcherThreadTest.java  |  11 +
 .../replica/fetcher/TestingLeaderEndpoint.java     |   1 +
 .../maintenance/observability/monitor-metrics.md   |  23 +-
 23 files changed, 759 insertions(+), 20 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index 4008c6b19..b1326d46b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -74,6 +74,12 @@ public class MetricNames {
     public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = 
"localSize";
     public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = 
"remoteLogSize";
 
+    // 
--------------------------------------------------------------------------------------------
+    // metrics for user
+    // 
--------------------------------------------------------------------------------------------
+    public static final String BYTES_IN = "bytesIn";
+    public static final String BYTES_OUT = "bytesOut";
+
     // 
--------------------------------------------------------------------------------------------
     // metrics for table
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java 
b/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java
new file mode 100644
index 000000000..1a70bcda4
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.entity;
+
+import org.apache.fluss.security.acl.FlussPrincipal;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/** The context information of user who writes or reads table. */
+public class UserContext {
+    private final FlussPrincipal principal;
+
+    public UserContext(FlussPrincipal principal) {
+        this.principal = checkNotNull(principal);
+    }
+
+    public FlussPrincipal getPrincipal() {
+        return principal;
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java
new file mode 100644
index 000000000..95ae062fc
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.metrics;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.entity.UserContext;
+import org.apache.fluss.server.metrics.group.AbstractUserMetricGroup;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
+import org.apache.fluss.server.metrics.group.UserMetricGroup;
+import org.apache.fluss.server.metrics.group.UserPerTableMetricGroup;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.concurrent.Scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Designed to manage and cleanup user-level metrics (will be used for future 
quota management).
+ *
+ * <p>To be more specific, Fluss server maintains three kind of metrics:
+ *
+ * <ul>
+ *   <li>1. Server-level metrics which is never expired or dropped, such as 
"tableCount".
+ *   <li>2. Table-level metrics which is dropped when the table is deleted, 
such as
+ *       "bytesInPerSecond" for a table.
+ *   <li>3. User-level metrics which will be expired and dropped after a 
period of inactivity, such
+ *       as "bytesInRate" for a user.
+ * </ul>
+ *
+ * <p>This class mainly manages the user-level metrics. There are many a lot 
of users to read/write
+ * tables, but most of them are idle after a period of time. To avoid memory 
leak or GC overhead, we
+ * need to clean up those inactive user-level metrics periodically.
+ */
+public class UserMetrics implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UserMetrics.class);
+    private static final long INACTIVE_METRIC_EXPIRATION_TIME_MS = 3600_000L; 
// 1 hour
+    private static final long METRICS_CLEANUP_INTERVAL_MS = 30_000L; // 30s
+
+    private final long inactiveMetricExpirationTimeMs;
+    private final MetricRegistry metricRegistry;
+    private final TabletServerMetricGroup parentMetricGroup;
+    private final ScheduledFuture<?> schedule;
+
+    private final ConcurrentMap<MetricKey, AbstractUserMetricGroup> metrics =
+            MapUtils.newConcurrentHashMap();
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+    public UserMetrics(
+            Scheduler cleanupScheduler,
+            MetricRegistry metricRegistry,
+            TabletServerMetricGroup parentMetricGroup) {
+        this(
+                INACTIVE_METRIC_EXPIRATION_TIME_MS,
+                METRICS_CLEANUP_INTERVAL_MS,
+                cleanupScheduler,
+                metricRegistry,
+                parentMetricGroup);
+    }
+
+    @VisibleForTesting
+    UserMetrics(
+            long inactiveMetricExpirationTimeMs,
+            long cleanupIntervalMs,
+            Scheduler cleanupScheduler,
+            MetricRegistry metricRegistry,
+            TabletServerMetricGroup parentMetricGroup) {
+        this.inactiveMetricExpirationTimeMs = inactiveMetricExpirationTimeMs;
+        this.metricRegistry = metricRegistry;
+        this.parentMetricGroup = parentMetricGroup;
+        this.schedule =
+                cleanupScheduler.schedule(
+                        "user-metrics-expired-cleanup-task",
+                        new ExpiredMetricCleanupTask(),
+                        cleanupIntervalMs,
+                        cleanupIntervalMs);
+    }
+
+    protected AbstractUserMetricGroup getOrCreateMetric(MetricKey metricKey) {
+        return metrics.computeIfAbsent(
+                metricKey,
+                key -> {
+                    if (metricKey.tablePath != null) {
+                        return new UserPerTableMetricGroup(
+                                metricRegistry,
+                                key.userName,
+                                key.tablePath,
+                                inactiveMetricExpirationTimeMs,
+                                parentMetricGroup);
+                    } else {
+                        return new UserMetricGroup(
+                                metricRegistry,
+                                key.userName,
+                                inactiveMetricExpirationTimeMs,
+                                parentMetricGroup);
+                    }
+                });
+    }
+
+    /** Increments the number of bytes written by a user on a specific table. 
*/
+    public void incBytesIn(@Nullable UserContext userContext, TablePath 
tablePath, long numBytes) {
+        incBytes(userContext, tablePath, numBytes, true);
+    }
+
+    /** Increments the number of bytes read by a user on a specific table. */
+    public void incBytesOut(@Nullable UserContext userContext, TablePath 
tablePath, long numBytes) {
+        incBytes(userContext, tablePath, numBytes, false);
+    }
+
+    private void incBytes(
+            @Nullable UserContext userContext,
+            TablePath tablePath,
+            long numBytes,
+            boolean bytesIn) {
+        if (userContext == null
+                || userContext.getPrincipal() == FlussPrincipal.ANY
+                || userContext.getPrincipal() == 
FlussPrincipal.WILD_CARD_PRINCIPAL
+                || userContext.getPrincipal() == FlussPrincipal.ANONYMOUS) {
+            // Ignore null or anonymous or wildcard users
+            return;
+        }
+        String userName = userContext.getPrincipal().getName();
+        AbstractUserMetricGroup user = getOrCreateMetric(new 
MetricKey(userName, null));
+        AbstractUserMetricGroup perTable = getOrCreateMetric(new 
MetricKey(userName, tablePath));
+        if (bytesIn) {
+            user.incBytesIn(numBytes);
+            perTable.incBytesIn(numBytes);
+        } else {
+            user.incBytesOut(numBytes);
+            perTable.incBytesOut(numBytes);
+        }
+    }
+
+    @VisibleForTesting
+    int numMetrics() {
+        return metrics.size();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (isClosed.compareAndSet(false, true)) {
+            schedule.cancel(true);
+            for (AbstractMetricGroup metricGroup : metrics.values()) {
+                metricGroup.close();
+            }
+            metrics.clear();
+        }
+    }
+
+    /** A periodic task to clean up expired user metrics. */
+    private class ExpiredMetricCleanupTask implements Runnable {
+
+        @Override
+        public void run() {
+            for (Map.Entry<MetricKey, AbstractUserMetricGroup> metricEntry : 
metrics.entrySet()) {
+                MetricKey metricName = metricEntry.getKey();
+                AbstractUserMetricGroup userMetric = metricEntry.getValue();
+                synchronized (userMetric) {
+                    if (userMetric.hasExpired()) {
+                        LOG.debug("Removing expired user metric [{}]", 
metricName);
+                        metrics.remove(metricName);
+                        userMetric.close();
+                    }
+                }
+            }
+        }
+    }
+
+    /** The key to identify user metrics. */
+    protected static class MetricKey {
+        final String userName;
+        @Nullable final TablePath tablePath;
+
+        MetricKey(String userName, @Nullable TablePath tablePath) {
+            this.userName = userName;
+            this.tablePath = tablePath;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            MetricKey metricKey = (MetricKey) o;
+            return Objects.equals(userName, metricKey.userName)
+                    && Objects.equals(tablePath, metricKey.tablePath);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(userName, tablePath);
+        }
+
+        @Override
+        public String toString() {
+            if (tablePath == null) {
+                return userName;
+            } else {
+                return userName + ":" + tablePath;
+            }
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java
new file mode 100644
index 000000000..080c72315
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metrics.CharacterFilter;
+import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+
+import java.util.Map;
+
+import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/** Abstract metric the users in server and tracks the expiration. */
+public abstract class AbstractUserMetricGroup extends AbstractMetricGroup {
+    private static final String NAME = "user";
+
+    private final String principalName;
+    private final long inactiveMetricExpirationTimeMs;
+    protected final Counter bytesIn;
+    protected final Counter bytesOut;
+
+    /** The last record time for the group. */
+    protected volatile long lastRecordTime;
+
+    public AbstractUserMetricGroup(
+            MetricRegistry registry,
+            String principalName,
+            long inactiveMetricExpirationTimeMs,
+            TabletServerMetricGroup tabletServerMetricGroup) {
+        super(registry, makeScope(tabletServerMetricGroup, principalName), 
tabletServerMetricGroup);
+        this.principalName = checkNotNull(principalName);
+        this.inactiveMetricExpirationTimeMs = inactiveMetricExpirationTimeMs;
+
+        this.bytesIn = new ThreadSafeSimpleCounter();
+        this.bytesOut = new ThreadSafeSimpleCounter();
+
+        this.lastRecordTime = System.currentTimeMillis();
+    }
+
+    @Override
+    protected String getGroupName(CharacterFilter filter) {
+        return NAME;
+    }
+
+    @VisibleForTesting
+    public String getPrincipalName() {
+        return principalName;
+    }
+
+    @Override
+    protected void putVariables(Map<String, String> variables) {
+        variables.put("user", principalName);
+    }
+
+    public void incBytesIn(long numBytes) {
+        this.lastRecordTime = System.currentTimeMillis();
+        bytesIn.inc(numBytes);
+    }
+
+    public void incBytesOut(long numBytes) {
+        this.lastRecordTime = System.currentTimeMillis();
+        bytesOut.inc(numBytes);
+    }
+
+    /** Return true if the metric is eligible for removal due to inactivity. 
false otherwise. */
+    public boolean hasExpired() {
+        return (System.currentTimeMillis() - lastRecordTime) > 
this.inactiveMetricExpirationTimeMs;
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
new file mode 100644
index 000000000..7a4a0f53e
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.metrics.MeterView;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+
+/**
+ * Metrics for the overall user level in server with {@link 
TabletServerMetricGroup} as parent
+ * group.
+ */
+public class UserMetricGroup extends AbstractUserMetricGroup {
+
+    public UserMetricGroup(
+            MetricRegistry registry,
+            String principalName,
+            long inactiveMetricExpirationTimeMs,
+            TabletServerMetricGroup tabletServerMetricGroup) {
+        super(registry, principalName, inactiveMetricExpirationTimeMs, 
tabletServerMetricGroup);
+
+        meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn));
+        meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut));
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java
new file mode 100644
index 000000000..55d9cd82a
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+
+import java.util.Map;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Metrics for the users per table in server with {@link 
TabletServerMetricGroup} as parent group.
+ */
+public class UserPerTableMetricGroup extends AbstractUserMetricGroup {
+
+    private final TablePath tablePath;
+
+    public UserPerTableMetricGroup(
+            MetricRegistry registry,
+            String principalName,
+            TablePath tablePath,
+            long inactiveMetricExpirationTimeMs,
+            TabletServerMetricGroup tabletServerMetricGroup) {
+        super(registry, principalName, inactiveMetricExpirationTimeMs, 
tabletServerMetricGroup);
+        this.tablePath = checkNotNull(tablePath);
+
+        // only track counters for per-table user metrics for billing purposes,
+        // the corresponding rates are tracked at the overall user level
+        counter(MetricNames.BYTES_IN, bytesIn);
+        counter(MetricNames.BYTES_OUT, bytesOut);
+    }
+
+    @Override
+    protected void putVariables(Map<String, String> variables) {
+        super.putVariables(variables);
+        variables.put("database", tablePath.getDatabaseName());
+        variables.put("table", tablePath.getTableName());
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 5eb19da65..d9350257d 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -67,6 +67,7 @@ import 
org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket;
 import org.apache.fluss.server.entity.NotifyRemoteLogOffsetsData;
 import org.apache.fluss.server.entity.StopReplicaData;
 import org.apache.fluss.server.entity.StopReplicaResultForBucket;
+import org.apache.fluss.server.entity.UserContext;
 import org.apache.fluss.server.kv.KvManager;
 import org.apache.fluss.server.kv.KvSnapshotResource;
 import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter;
@@ -84,6 +85,7 @@ import 
org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
 import org.apache.fluss.server.log.remote.RemoteLogManager;
 import org.apache.fluss.server.metadata.ClusterMetadata;
 import org.apache.fluss.server.metadata.TabletServerMetadataCache;
+import org.apache.fluss.server.metrics.UserMetrics;
 import org.apache.fluss.server.metrics.group.BucketMetricGroup;
 import org.apache.fluss.server.metrics.group.TableMetricGroup;
 import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
@@ -186,6 +188,7 @@ public class ReplicaManager {
 
     // for metrics
     private final TabletServerMetricGroup serverMetricGroup;
+    private final UserMetrics userMetrics;
     private final String internalListenerName;
 
     private final Clock clock;
@@ -203,6 +206,7 @@ public class ReplicaManager {
             CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
             FatalErrorHandler fatalErrorHandler,
             TabletServerMetricGroup serverMetricGroup,
+            UserMetrics userMetrics,
             Clock clock,
             ExecutorService ioExecutor)
             throws IOException {
@@ -219,6 +223,7 @@ public class ReplicaManager {
                 completedKvSnapshotCommitter,
                 fatalErrorHandler,
                 serverMetricGroup,
+                userMetrics,
                 new RemoteLogManager(conf, zkClient, coordinatorGateway, 
clock, ioExecutor),
                 clock,
                 ioExecutor);
@@ -238,6 +243,7 @@ public class ReplicaManager {
             CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
             FatalErrorHandler fatalErrorHandler,
             TabletServerMetricGroup serverMetricGroup,
+            UserMetrics userMetrics,
             RemoteLogManager remoteLogManager,
             Clock clock,
             ExecutorService ioExecutor)
@@ -284,6 +290,7 @@ public class ReplicaManager {
                         zkClient, completedKvSnapshotCommitter, 
kvSnapshotResource, conf);
         this.remoteLogManager = remoteLogManager;
         this.serverMetricGroup = serverMetricGroup;
+        this.userMetrics = userMetrics;
         this.clock = clock;
         this.ioExecutor = ioExecutor;
         registerMetrics();
@@ -456,6 +463,7 @@ public class ReplicaManager {
             int timeoutMs,
             int requiredAcks,
             Map<TableBucket, MemoryLogRecords> entriesPerBucket,
+            @Nullable UserContext userContext,
             Consumer<List<ProduceLogResultForBucket>> responseCallback) {
         if (isRequiredAcksInvalid(requiredAcks)) {
             throw new InvalidRequiredAcksException("Invalid required acks: " + 
requiredAcks);
@@ -463,7 +471,7 @@ public class ReplicaManager {
 
         long startTime = System.currentTimeMillis();
         Map<TableBucket, ProduceLogResultForBucket> appendResult =
-                appendToLocalLog(entriesPerBucket, requiredAcks);
+                appendToLocalLog(entriesPerBucket, requiredAcks, userContext);
         LOG.debug("Append records to local log in {} ms", 
System.currentTimeMillis() - startTime);
 
         // maybe do delay write operation.
@@ -480,9 +488,11 @@ public class ReplicaManager {
     public void fetchLogRecords(
             FetchParams params,
             Map<TableBucket, FetchReqInfo> bucketFetchInfo,
+            @Nullable UserContext userContext,
             Consumer<Map<TableBucket, FetchLogResultForBucket>> 
responseCallback) {
         long startTime = System.currentTimeMillis();
-        Map<TableBucket, LogReadResult> logReadResults = readFromLog(params, 
bucketFetchInfo);
+        Map<TableBucket, LogReadResult> logReadResults =
+                readFromLog(params, bucketFetchInfo, userContext);
         if (LOG.isTraceEnabled()) {
             LOG.trace(
                     "Fetch log records from local log in {} ms",
@@ -490,7 +500,8 @@ public class ReplicaManager {
         }
 
         // maybe do delay fetch log operation.
-        maybeAddDelayedFetchLog(params, bucketFetchInfo, logReadResults, 
responseCallback);
+        maybeAddDelayedFetchLog(
+                params, bucketFetchInfo, logReadResults, userContext, 
responseCallback);
     }
 
     /**
@@ -925,18 +936,23 @@ public class ReplicaManager {
 
     /** Append log records to leader replicas of the buckets. */
     private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
-            Map<TableBucket, MemoryLogRecords> entriesPerBucket, int 
requiredAcks) {
+            Map<TableBucket, MemoryLogRecords> entriesPerBucket,
+            int requiredAcks,
+            @Nullable UserContext userContext) {
         Map<TableBucket, ProduceLogResultForBucket> resultForBucketMap = new 
HashMap<>();
         for (Map.Entry<TableBucket, MemoryLogRecords> entry : 
entriesPerBucket.entrySet()) {
             TableBucket tb = entry.getKey();
+            MemoryLogRecords records = entry.getValue();
             TableMetricGroup tableMetrics = null;
             try {
                 Replica replica = getReplicaOrException(tb);
                 tableMetrics = replica.tableMetrics();
                 tableMetrics.totalProduceLogRequests().inc();
+                // record user metrics before appending to log,
+                // so that if appending fails, we still account the bytes.
+                userMetrics.incBytesIn(userContext, replica.getTablePath(), 
records.sizeInBytes());
                 LOG.trace("Append records to local log tablet for table bucket 
{}", tb);
-                LogAppendInfo appendInfo =
-                        replica.appendRecordsToLeader(entry.getValue(), 
requiredAcks);
+                LogAppendInfo appendInfo = 
replica.appendRecordsToLeader(records, requiredAcks);
 
                 long baseOffset = appendInfo.firstOffset();
                 LOG.trace(
@@ -1049,7 +1065,9 @@ public class ReplicaManager {
     }
 
     public Map<TableBucket, LogReadResult> readFromLog(
-            FetchParams fetchParams, Map<TableBucket, FetchReqInfo> 
bucketFetchInfo) {
+            FetchParams fetchParams,
+            Map<TableBucket, FetchReqInfo> bucketFetchInfo,
+            @Nullable UserContext userContext) {
         Map<TableBucket, LogReadResult> logReadResult = new HashMap<>();
         boolean isFromFollower = fetchParams.isFromFollower();
         int limitBytes = fetchParams.maxFetchBytes();
@@ -1108,6 +1126,7 @@ public class ReplicaManager {
                     
serverMetricGroup.replicationBytesOut().inc(recordBatchSize);
                 } else {
                     tableMetrics.incLogBytesOut(recordBatchSize);
+                    userMetrics.incBytesOut(userContext, 
replica.getTablePath(), recordBatchSize);
                 }
             } catch (Exception e) {
                 if (isUnexpectedException(e)) {
@@ -1297,6 +1316,7 @@ public class ReplicaManager {
             FetchParams params,
             Map<TableBucket, FetchReqInfo> bucketFetchInfo,
             Map<TableBucket, LogReadResult> logReadResults,
+            @Nullable UserContext userContext,
             Consumer<Map<TableBucket, FetchLogResultForBucket>> 
responseCallback) {
         long bytesReadable = 0;
         boolean errorReadingData = false;
@@ -1363,7 +1383,8 @@ public class ReplicaManager {
                                 delayedResponse.putAll(expectedErrorBuckets);
                                 responseCallback.accept(delayedResponse);
                             },
-                            serverMetricGroup);
+                            serverMetricGroup,
+                            userContext);
 
             // try to complete the request immediately, otherwise put it into 
the
             // delayedFetchLogManager; this is because while the delayed fetch 
log operation is
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
index 9e0c49052..e17b7e1cc 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
@@ -28,6 +28,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
 import org.apache.fluss.rpc.messages.FetchLogRequest;
 import org.apache.fluss.server.entity.FetchReqInfo;
+import org.apache.fluss.server.entity.UserContext;
 import org.apache.fluss.server.log.FetchIsolation;
 import org.apache.fluss.server.log.FetchParams;
 import org.apache.fluss.server.log.LogOffsetMetadata;
@@ -40,6 +41,8 @@ import 
org.apache.fluss.server.replica.ReplicaManager.LogReadResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -59,19 +62,22 @@ public class DelayedFetchLog extends DelayedOperation {
     private final Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap;
     private final Consumer<Map<TableBucket, FetchLogResultForBucket>> 
responseCallback;
     private final TabletServerMetricGroup serverMetricGroup;
+    @Nullable private final UserContext userContext;
 
     public DelayedFetchLog(
             FetchParams params,
             ReplicaManager replicaManager,
             Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap,
             Consumer<Map<TableBucket, FetchLogResultForBucket>> 
responseCallback,
-            TabletServerMetricGroup serverMetricGroup) {
+            TabletServerMetricGroup serverMetricGroup,
+            @Nullable UserContext userContext) {
         super(params.maxWaitMs());
         this.params = params;
         this.replicaManager = replicaManager;
         this.fetchBucketStatusMap = fetchBucketStatusMap;
         this.responseCallback = responseCallback;
         this.serverMetricGroup = serverMetricGroup;
+        this.userContext = userContext;
     }
 
     /** Upon completion, read whatever data is available and pass to the 
complete callback. */
@@ -93,7 +99,7 @@ public class DelayedFetchLog extends DelayedOperation {
 
         // re-fetch data.
         Map<TableBucket, LogReadResult> reReadResult =
-                replicaManager.readFromLog(params, reFetchBuckets);
+                replicaManager.readFromLog(params, reFetchBuckets, 
userContext);
         reReadResult.forEach((key, value) -> result.put(key, 
value.getFetchLogResultForBucket()));
         responseCallback.accept(result);
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 88fa36ff4..bd185ec62 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -46,6 +46,7 @@ import org.apache.fluss.server.log.LogManager;
 import org.apache.fluss.server.log.remote.RemoteLogManager;
 import org.apache.fluss.server.metadata.TabletServerMetadataCache;
 import org.apache.fluss.server.metrics.ServerMetricUtils;
+import org.apache.fluss.server.metrics.UserMetrics;
 import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.server.replica.ReplicaManager;
 import org.apache.fluss.server.zk.ZooKeeperClient;
@@ -127,6 +128,9 @@ public class TabletServer extends ServerBase {
     @GuardedBy("lock")
     private MetricRegistry metricRegistry;
 
+    @GuardedBy("lock")
+    private UserMetrics userMetrics;
+
     @GuardedBy("lock")
     private TabletServerMetricGroup tabletServerMetricGroup;
 
@@ -194,6 +198,9 @@ public class TabletServer extends ServerBase {
 
             List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, 
ServerType.TABLET_SERVER);
 
+            this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
+            scheduler.startup();
+
             // for metrics
             this.metricRegistry = MetricRegistry.create(conf, pluginManager);
             this.tabletServerMetricGroup =
@@ -203,6 +210,7 @@ public class TabletServer extends ServerBase {
                             rack,
                             endpoints.get(0).getHost(),
                             serverId);
+            this.userMetrics = new UserMetrics(scheduler, metricRegistry, 
tabletServerMetricGroup);
 
             this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
 
@@ -216,9 +224,6 @@ public class TabletServer extends ServerBase {
 
             this.metadataCache = new 
TabletServerMetadataCache(metadataManager);
 
-            this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
-            scheduler.startup();
-
             this.logManager =
                     LogManager.create(conf, zkClient, scheduler, clock, 
tabletServerMetricGroup);
             logManager.startup();
@@ -262,6 +267,7 @@ public class TabletServer extends ServerBase {
                                     rpcClient, metadataCache, 
interListenerName),
                             this,
                             tabletServerMetricGroup,
+                            userMetrics,
                             clock,
                             ioExecutor);
             replicaManager.startup();
@@ -408,6 +414,10 @@ public class TabletServer extends ServerBase {
                     clientMetricGroup.close();
                 }
 
+                if (userMetrics != null) {
+                    userMetrics.close();
+                }
+
                 // We must shut down the scheduler early because otherwise, 
the scheduler could
                 // touch other resources that might have been shutdown and 
cause exceptions.
                 if (scheduler != null) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index 837bc1b23..9a47b6383 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -70,6 +70,7 @@ import org.apache.fluss.server.authorizer.Authorizer;
 import org.apache.fluss.server.coordinator.MetadataManager;
 import org.apache.fluss.server.entity.FetchReqInfo;
 import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
+import org.apache.fluss.server.entity.UserContext;
 import org.apache.fluss.server.log.FetchParams;
 import org.apache.fluss.server.log.ListOffsetsParam;
 import org.apache.fluss.server.metadata.TabletServerMetadataCache;
@@ -169,6 +170,7 @@ public final class TabletService extends RpcServiceBase 
implements TabletServerG
                 request.getTimeoutMs(),
                 request.getAcks(),
                 produceLogData,
+                new UserContext(currentSession().getPrincipal()),
                 bucketResponseMap -> 
response.complete(makeProduceLogResponse(bucketResponseMap)));
         return response;
     }
@@ -193,6 +195,7 @@ public final class TabletService extends RpcServiceBase 
implements TabletServerG
         replicaManager.fetchLogRecords(
                 fetchParams,
                 interesting,
+                new UserContext(currentSession().getPrincipal()),
                 fetchResponseMap ->
                         response.complete(
                                 makeFetchLogResponse(fetchResponseMap, 
errorResponseMap)));
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
index a40dfe162..3187e5db5 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
@@ -133,6 +133,7 @@ public class RemoteLogITCase {
                 .fetchLogRecords(
                         new FetchParams(-1, Integer.MAX_VALUE),
                         Collections.singletonMap(tb, new FetchReqInfo(tableId, 
0, 10240)),
+                        null,
                         future::complete);
 
         Map<TableBucket, FetchLogResultForBucket> result = future.get();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
index 9c91890b6..e2e5be650 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
@@ -322,6 +322,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
         replicaManager.fetchLogRecords(
                 new FetchParams(-1, Integer.MAX_VALUE),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
0L, 1024 * 1024)),
+                null,
                 future::complete);
         Map<TableBucket, FetchLogResultForBucket> result = future.get();
         assertThat(result.size()).isEqualTo(1);
@@ -341,6 +342,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
         replicaManager.fetchLogRecords(
                 new FetchParams(-1, Integer.MAX_VALUE),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
47, 1024 * 1024)),
+                null,
                 future::complete);
         result = future.get();
         assertThat(result.size()).isEqualTo(1);
@@ -379,6 +381,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
         replicaManager.fetchLogRecords(
                 new FetchParams(-1, Integer.MAX_VALUE),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
0L, 1024 * 1024)),
+                null,
                 future::complete);
         Map<TableBucket, FetchLogResultForBucket> result = future.get();
         assertThat(result.size()).isEqualTo(1);
@@ -398,6 +401,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
         replicaManager.fetchLogRecords(
                 new FetchParams(-1, Integer.MAX_VALUE),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
35, 1024 * 1024)),
+                null,
                 future::complete);
         result = future.get();
         assertThat(result.size()).isEqualTo(1);
@@ -449,6 +453,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
         replicaManager.fetchLogRecords(
                 new FetchParams(-1, Integer.MAX_VALUE),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
20L, 1024 * 1024)),
+                null,
                 future::complete);
         Map<TableBucket, FetchLogResultForBucket> result = future.get();
         assertThat(result.get(tb).fetchFromRemote()).isFalse();
@@ -459,6 +464,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
         replicaManager.fetchLogRecords(
                 new FetchParams(-1, Integer.MAX_VALUE),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
0, 1024 * 1024)),
+                null,
                 future::complete);
         result = future.get();
         assertThat(result.get(tb).fetchFromRemote()).isTrue();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java
index 1f0d24d05..05f12a18b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java
@@ -82,6 +82,7 @@ final class RemoteLogTTLTest extends RemoteLogTestBase {
         replicaManager.fetchLogRecords(
                 new FetchParams(-1, Integer.MAX_VALUE),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
0L, 1024 * 1024)),
+                null,
                 future::complete);
         Map<TableBucket, FetchLogResultForBucket> result = future.get();
         assertThat(result.size()).isEqualTo(1);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java
new file mode 100644
index 000000000..f94205a5a
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.metrics;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.registry.NOPMetricRegistry;
+import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.entity.UserContext;
+import org.apache.fluss.server.metrics.group.AbstractUserMetricGroup;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static 
org.apache.fluss.server.metrics.group.TestingMetricGroups.TABLET_SERVER_METRICS;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link UserMetrics}. */
+public class UserMetricsTest {
+
+    private FlussScheduler scheduler;
+
+    @BeforeEach
+    void before() {
+        scheduler = new FlussScheduler(2, false);
+        scheduler.startup();
+    }
+
+    @AfterEach
+    void after() throws InterruptedException {
+        scheduler.shutdown();
+    }
+
+    @Test
+    void testMetricExpiration() {
+        // set expiration time to 2s in test, and 10ms check interval
+        UserMetrics userMetrics =
+                new UserMetrics(
+                        Duration.ofSeconds(2).toMillis(),
+                        100L,
+                        scheduler,
+                        NOPMetricRegistry.INSTANCE,
+                        TABLET_SERVER_METRICS);
+
+        String user1 = "user1";
+        String user2 = "user2";
+        UserContext uc1 = new UserContext(new FlussPrincipal(user1, "USER"));
+        UserContext uc2 = new UserContext(new FlussPrincipal(user2, "USER"));
+
+        TablePath t1 = TablePath.of("db1", "table1");
+        TablePath t2 = TablePath.of("db1", "table2");
+
+        userMetrics.incBytesIn(null, t1, 100);
+        userMetrics.incBytesOut(null, t2, 200);
+        userMetrics.incBytesOut(new UserContext(FlussPrincipal.ANY), t1, 200);
+        userMetrics.incBytesIn(new UserContext(FlussPrincipal.ANONYMOUS), t1, 
300);
+        userMetrics.incBytesOut(new 
UserContext(FlussPrincipal.WILD_CARD_PRINCIPAL), t2, 100);
+
+        // ignore null/anonymous/wildcard user
+        assertThat(userMetrics.numMetrics()).isEqualTo(0);
+
+        userMetrics.incBytesIn(uc1, t1, 100);
+        userMetrics.incBytesOut(uc1, t2, 100);
+        userMetrics.incBytesOut(uc2, t2, 200);
+
+        // u1, u1+t1, u1+t2, u2, u2+t2
+        assertThat(userMetrics.numMetrics()).isEqualTo(5);
+
+        scheduler.schedule(
+                "updating-u1t1",
+                () -> {
+                    userMetrics.incBytesIn(uc1, t1, 50);
+                },
+                0L,
+                50L);
+
+        AbstractUserMetricGroup u1t1 =
+                userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, 
t1));
+        AbstractUserMetricGroup u1t2 =
+                userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, 
t2));
+        AbstractUserMetricGroup u2t2 =
+                userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user2, 
t2));
+        AbstractUserMetricGroup u1 =
+                userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1, 
null));
+        AbstractUserMetricGroup u2 =
+                userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user2, 
null));
+
+        // wait enough time for metrics expired
+        retry(
+                Duration.ofSeconds(10),
+                () -> {
+                    // only u1 and u1+t1 are retained.
+                    assertThat(userMetrics.numMetrics()).isEqualTo(2);
+                });
+
+        assertThat(u1t2.isClosed()).isTrue();
+        assertThat(u2t2.isClosed()).isTrue();
+        assertThat(u2.isClosed()).isTrue();
+
+        assertThat(u1.isClosed()).isFalse();
+        assertThat(u1t1.isClosed()).isFalse();
+        assertThat(userMetrics.getOrCreateMetric(new 
UserMetrics.MetricKey(user1, null)))
+                .isSameAs(u1);
+        assertThat(userMetrics.getOrCreateMetric(new 
UserMetrics.MetricKey(user1, t1)))
+                .isSameAs(u1t1);
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
index 15d1316b5..b2e93f2bb 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
@@ -19,6 +19,11 @@ package org.apache.fluss.server.metrics.group;
 
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.registry.NOPMetricRegistry;
+import org.apache.fluss.server.metrics.UserMetrics;
+import org.apache.fluss.testutils.common.ScheduledTask;
+import org.apache.fluss.utils.concurrent.Scheduler;
+
+import java.util.concurrent.ScheduledFuture;
 
 /** Utilities for various metric groups for testing. */
 public class TestingMetricGroups {
@@ -38,4 +43,30 @@ public class TestingMetricGroups {
 
     public static final BucketMetricGroup BUCKET_METRICS =
             new BucketMetricGroup(NOPMetricRegistry.INSTANCE, null, 0, 
TABLE_METRICS);
+
+    public static final UserMetrics USER_METRICS =
+            new UserMetrics(
+                    new TestingScheduler(), NOPMetricRegistry.INSTANCE, 
TABLET_SERVER_METRICS);
+
+    // 
------------------------------------------------------------------------------------------
+
+    private static class TestingScheduler implements Scheduler {
+        @Override
+        public void startup() {
+            // no-op
+        }
+
+        @Override
+        public void shutdown() throws InterruptedException {
+            // no-op
+        }
+
+        @Override
+        public ScheduledFuture<?> schedule(
+                String name, Runnable task, long delayMs, long periodMs) {
+            // Directly run the task for testing purpose.
+            task.run();
+            return new ScheduledTask<>(() -> null, delayMs, periodMs);
+        }
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
index 64eade999..1779b96f8 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
@@ -67,6 +67,7 @@ public class AdjustIsrTest extends ReplicaTestBase {
                 20000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
 
@@ -77,6 +78,7 @@ public class AdjustIsrTest extends ReplicaTestBase {
                         2, (int) 
conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_BYTES).getBytes()),
                 Collections.singletonMap(
                         tb, new FetchReqInfo(tb.getTableId(), 10L, 
Integer.MAX_VALUE)),
+                null,
                 result -> {});
         retry(
                 Duration.ofSeconds(20),
@@ -92,6 +94,7 @@ public class AdjustIsrTest extends ReplicaTestBase {
                         3, (int) 
conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_BYTES).getBytes()),
                 Collections.singletonMap(
                         tb, new FetchReqInfo(tb.getTableId(), 10L, 
Integer.MAX_VALUE)),
+                null,
                 result -> {});
         retry(
                 Duration.ofSeconds(20),
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index 6ec71bc79..7db23130f 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -145,6 +145,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 20000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
 
@@ -154,6 +155,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 20000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 10L, 20L));
 
@@ -165,6 +167,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                                         100,
                                         Collections.singletonMap(
                                                 tb, 
genMemoryLogRecordsByObject(DATA1)),
+                                        null,
                                         (result) -> {
                                             // do nothing.
                                         }))
@@ -179,6 +182,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 20000,
                 1,
                 Collections.singletonMap(unknownBucket, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get())
                 .containsOnly(
@@ -203,6 +207,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.fetchLogRecords(
                 buildFetchParams(-1),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
0L, 1024 * 1024)),
+                null,
                 emptyFuture::complete);
         Map<TableBucket, FetchLogResultForBucket> result = emptyFuture.get();
         assertThat(result.size()).isEqualTo(1);
@@ -217,6 +222,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 20000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
 
@@ -226,6 +232,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.fetchLogRecords(
                 buildFetchParams(-1),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
0L, 1024 * 1024)),
+                null,
                 future1::complete);
         result = future1.get();
         assertThat(result.size()).isEqualTo(1);
@@ -241,6 +248,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.fetchLogRecords(
                 buildFetchParams(-1),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
3L, 1024 * 1024)),
+                null,
                 future1::complete);
         result = future1.get();
         assertThat(result.size()).isEqualTo(1);
@@ -257,6 +265,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 20000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 10L, 20L));
 
@@ -265,6 +274,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.fetchLogRecords(
                 buildFetchParams(-1),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
10L, 1024 * 1024)),
+                null,
                 future1::complete);
         result = future1.get();
         assertThat(result.size()).isEqualTo(1);
@@ -280,6 +290,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.fetchLogRecords(
                 buildFetchParams(-1),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
100L, 1024 * 1024)),
+                null,
                 future1::complete);
         result = future1.get();
         assertThat(result.size()).isEqualTo(1);
@@ -297,6 +308,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.fetchLogRecords(
                 buildFetchParams(-1),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
20L, 1024 * 1024)),
+                null,
                 future1::complete);
         result = future1.get();
         assertThat(result.size()).isEqualTo(1);
@@ -319,7 +331,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         int maxFetchBytesSize = batchSize + 10;
         CompletableFuture<List<ProduceLogResultForBucket>> future = new 
CompletableFuture<>();
         replicaManager.appendRecordsToLog(
-                20000, 1, Collections.singletonMap(tb, records1), 
future::complete);
+                20000, 1, Collections.singletonMap(tb, records1), null, 
future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
 
         // fetch from this bucket from offset 0 with fetch max bytes size 
bigger that data1 batch
@@ -330,6 +342,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 buildFetchParams(-1, maxFetchBytesSize),
                 Collections.singletonMap(
                         tb, new FetchReqInfo(tb.getTableId(), 0L, 
Integer.MAX_VALUE)),
+                null,
                 future1::complete);
         Map<TableBucket, FetchLogResultForBucket> result = future1.get();
         assertThat(result.size()).isEqualTo(1);
@@ -347,6 +360,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 20000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(ANOTHER_DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 10L, 20L));
 
@@ -356,6 +370,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 buildFetchParams(-1),
                 Collections.singletonMap(
                         tb, new FetchReqInfo(tb.getTableId(), 0, 
Integer.MAX_VALUE)),
+                null,
                 future1::complete);
         result = future1.get();
         resultForBucket = result.get(tb);
@@ -374,6 +389,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 buildFetchParams(-1, maxFetchBytesSize),
                 Collections.singletonMap(
                         tb, new FetchReqInfo(tb.getTableId(), 0, 
Integer.MAX_VALUE)),
+                null,
                 future1::complete);
         result = future1.get();
         resultForBucket = result.get(tb);
@@ -397,7 +413,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         Map<TableBucket, MemoryLogRecords> data = new HashMap<>();
         data.put(tb1, genMemoryLogRecordsByObject(DATA1));
         data.put(tb2, genMemoryLogRecordsByObject(DATA1));
-        replicaManager.appendRecordsToLog(20000, 1, data, future::complete);
+        replicaManager.appendRecordsToLog(20000, 1, data, null, 
future::complete);
         assertThat(future.get())
                 .containsExactlyInAnyOrder(
                         new ProduceLogResultForBucket(tb1, 0, 10L),
@@ -408,7 +424,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         data = new HashMap<>();
         data.put(tb1, genMemoryLogRecordsByObject(ANOTHER_DATA1));
         data.put(tb2, genMemoryLogRecordsByObject(ANOTHER_DATA1));
-        replicaManager.appendRecordsToLog(20000, 1, data, future::complete);
+        replicaManager.appendRecordsToLog(20000, 1, data, null, 
future::complete);
         assertThat(future.get())
                 .containsExactlyInAnyOrder(
                         new ProduceLogResultForBucket(tb1, 10L, 20L),
@@ -421,7 +437,8 @@ class ReplicaManagerTest extends ReplicaTestBase {
         Map<TableBucket, FetchReqInfo> newFetchData = new HashMap<>();
         newFetchData.put(tb1, new FetchReqInfo(tb1.getTableId(), 0, 
Integer.MAX_VALUE));
         newFetchData.put(tb2, new FetchReqInfo(tb2.getTableId(), 0, 
Integer.MAX_VALUE));
-        replicaManager.fetchLogRecords(buildFetchParams(-1, 10), newFetchData, 
future1::complete);
+        replicaManager.fetchLogRecords(
+                buildFetchParams(-1, 10), newFetchData, null, 
future1::complete);
         Map<TableBucket, FetchLogResultForBucket> result = future1.get();
         assertThat(result.size()).isEqualTo(2);
         List<FetchLogResultForBucket> resultList = new 
ArrayList<>(result.values());
@@ -533,6 +550,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.fetchLogRecords(
                 buildFetchParams(-1),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
0L, 1024 * 1024)),
+                null,
                 future1::complete);
         FetchLogResultForBucket resultForBucket = future1.get().get(tb);
         assertThat(resultForBucket.getHighWatermark()).isEqualTo(5L);
@@ -574,6 +592,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.fetchLogRecords(
                 buildFetchParams(-1),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
0L, 1024 * 1024)),
+                null,
                 future1::complete);
         resultForBucket = future1.get().get(tb);
         assertThat(resultForBucket.getHighWatermark()).isEqualTo(5L);
@@ -612,6 +631,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.fetchLogRecords(
                 buildFetchParams(-1),
                 Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 
0L, 1024 * 1024)),
+                null,
                 future1::complete);
         resultForBucket = future1.get().get(tb);
         assertThat(resultForBucket.getHighWatermark()).isEqualTo(8L);
@@ -667,6 +687,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 buildFetchParams(-1),
                 Collections.singletonMap(
                         tb, new FetchReqInfo(tb.getTableId(), 0L, 
Integer.MAX_VALUE)),
+                null,
                 future1::complete);
         FetchLogResultForBucket resultForBucket = future1.get().get(tb);
         assertThat(resultForBucket.getHighWatermark()).isEqualTo(18L);
@@ -892,6 +913,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 20000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
         // produce another batch to this bucket.
@@ -900,6 +922,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 20000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(ANOTHER_DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 10, 20L));
 
@@ -928,6 +951,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 20000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
 
@@ -984,6 +1008,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                     20000,
                     1,
                     Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                    null,
                     future::complete);
             future.get();
             // advance clock to generate different batch commit timestamp.
@@ -1005,6 +1030,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 buildFetchParams(-1, Integer.MAX_VALUE),
                 Collections.singletonMap(
                         tb, new FetchReqInfo(tb.getTableId(), 0L, 
Integer.MAX_VALUE)),
+                null,
                 future::complete);
         Map<Long, Long> offsetToCommitTimestampMap =
                 startOffsetToBatchCommitTimestamp(future.get().get(tb));
@@ -1082,6 +1108,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 300000,
                 -1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
     }
@@ -1809,7 +1836,8 @@ class ReplicaManagerTest extends ReplicaTestBase {
         for (TableBucket tb : tableBuckets) {
             fetchData.put(tb, new FetchReqInfo(tb.getTableId(), 0L, 1024 * 
1024));
         }
-        replicaManager.fetchLogRecords(buildFetchParams(-1), fetchData, 
fetchLogFuture::complete);
+        replicaManager.fetchLogRecords(
+                buildFetchParams(-1), fetchData, null, 
fetchLogFuture::complete);
         return fetchLogFuture.get();
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 81cc34aad..559353df6 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -303,6 +303,7 @@ public class ReplicaTestBase {
                 snapshotReporter,
                 NOPErrorHandler.INSTANCE,
                 TestingMetricGroups.TABLET_SERVER_METRICS,
+                TestingMetricGroups.USER_METRICS,
                 remoteLogManager,
                 manualClock,
                 ioExecutor);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/delay/DelayedFetchLogTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/delay/DelayedFetchLogTest.java
index 7822a0a4a..73d620842 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/delay/DelayedFetchLogTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/delay/DelayedFetchLogTest.java
@@ -90,6 +90,7 @@ public class DelayedFetchLogTest extends ReplicaTestBase {
                 20000,
                 -1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
 
@@ -161,6 +162,7 @@ public class DelayedFetchLogTest extends ReplicaTestBase {
                 replicaManager,
                 Collections.singletonMap(tb, prevFetchBucketStatus),
                 responseCallback,
-                TestingMetricGroups.TABLET_SERVER_METRICS);
+                TestingMetricGroups.TABLET_SERVER_METRICS,
+                null);
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
index 0b149abee..1e787a345 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
@@ -157,6 +157,7 @@ public class ReplicaFetcherITCase {
             replicaManager.fetchLogRecords(
                     new FetchParams(-1, false, Integer.MAX_VALUE, -1, -1),
                     Collections.singletonMap(tb, new FetchReqInfo(tableId, 0L, 
1024 * 1024)),
+                    null,
                     future::complete);
             Map<TableBucket, FetchLogResultForBucket> result = future.get();
             assertThat(result.size()).isEqualTo(1);
@@ -233,6 +234,7 @@ public class ReplicaFetcherITCase {
             replicaManager.fetchLogRecords(
                     new FetchParams(-1, false, Integer.MAX_VALUE, -1, -1),
                     Collections.singletonMap(tb, new FetchReqInfo(tableId, 0L, 
1024 * 1024)),
+                    null,
                     future::complete);
             Map<TableBucket, FetchLogResultForBucket> result = future.get();
             assertThat(result.size()).isEqualTo(1);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index 57f6be484..c857d84de 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -76,6 +76,7 @@ import static 
org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
 import static 
org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH;
+import static 
org.apache.fluss.server.metrics.group.TestingMetricGroups.USER_METRICS;
 import static 
org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_BUCKET_EPOCH;
 import static 
org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_LEADER_EPOCH;
 import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
@@ -152,6 +153,7 @@ public class ReplicaFetcherThreadTest {
                 1000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0, 10L));
 
@@ -175,6 +177,7 @@ public class ReplicaFetcherThreadTest {
                 1000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 10L, 20L));
         retry(
@@ -208,6 +211,7 @@ public class ReplicaFetcherThreadTest {
                     1000,
                     1, // don't wait ack
                     Collections.singletonMap(tb, 
genMemoryLogRecordsByObject(DATA1)),
+                    null,
                     future::complete);
             assertThat(future.get())
                     .containsOnly(new ProduceLogResultForBucket(tb, 
baseOffset, baseOffset + 10L));
@@ -240,6 +244,7 @@ public class ReplicaFetcherThreadTest {
                         1,
                         Collections.singletonMap(
                                 tb, genMemoryLogRecordsWithWriterId(DATA1, 
writerId, i, 0)),
+                        null,
                         future::complete);
                 assertThat(future.get())
                         .containsOnly(
@@ -274,6 +279,7 @@ public class ReplicaFetcherThreadTest {
                 1000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsWithWriterId(DATA1, 101L, 5, 100L)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 100L, 110L));
 
@@ -285,6 +291,7 @@ public class ReplicaFetcherThreadTest {
                 1000,
                 1,
                 Collections.singletonMap(tb, 
genMemoryLogRecordsWithWriterId(DATA1, 100L, 5, 110L)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 110L, 120L));
         retry(
@@ -306,6 +313,7 @@ public class ReplicaFetcherThreadTest {
                 1,
                 Collections.singletonMap(
                         tb, genMemoryLogRecordsWithWriterId(DATA1, writerId, 
0, 0)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0L, 10L));
 
@@ -326,6 +334,7 @@ public class ReplicaFetcherThreadTest {
                 1,
                 Collections.singletonMap(
                         tb, genMemoryLogRecordsWithWriterId(DATA1, writerId, 
1, 0)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 10L, 20L));
 
@@ -355,6 +364,7 @@ public class ReplicaFetcherThreadTest {
                 1,
                 Collections.singletonMap(
                         tb, genMemoryLogRecordsWithWriterId(DATA1, writerId, 
2, 0)),
+                null,
                 future::complete);
         assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 20L, 30L));
         // now fetcher will work well since the state of writerId=101 is 
established
@@ -469,6 +479,7 @@ public class ReplicaFetcherThreadTest {
                     new TestingCompletedKvSnapshotCommitter(),
                     NOPErrorHandler.INSTANCE,
                     serverMetricGroup,
+                    USER_METRICS,
                     clock,
                     ioExecutor);
         }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
index ecae06d5d..67e8b0ac6 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
@@ -92,6 +92,7 @@ public class TestingLeaderEndpoint implements LeaderEndpoint {
                 new FetchParams(
                         fetchLogRequest.getFollowerServerId(), 
fetchLogRequest.getMaxBytes()),
                 fetchLogData,
+                null,
                 result ->
                         response.complete(
                                 new FetchData(new FetchLogResponse(), 
processResult(result))));
diff --git a/website/docs/maintenance/observability/monitor-metrics.md 
b/website/docs/maintenance/observability/monitor-metrics.md
index 6f2eb5156..2d8ef8077 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -380,7 +380,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </thead>
   <tbody>
     <tr>
-      <th rowspan="29"><strong>tabletserver</strong></th>
+      <th rowspan="33"><strong>tabletserver</strong></th>
       <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="25">-</td>
       <td>messagesInPerSecond</td>
       <td>The number of messages written per second to this server.</td>
@@ -528,6 +528,27 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>The physical remote log size managed by this TabletServer.</td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <td rowspan="4">user</td>
+      <td>bytesIn</td>
+      <td>The total number of bytes written to this server labeled with 
<code>user</code> name and <code>database</code> name and <code>table</code> 
name. </td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>bytesOut</td>
+      <td>The total number of bytes read from this server labeled with 
<code>user</code> name and <code>database</code> name and <code>table</code> 
name. </td>
+      <td>Counter</td>
+    </tr>
+     <tr>
+      <td>bytesInPerSecond</td>
+      <td>The number of bytes written per second to this server labeled with 
<code>user</code>.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>bytesOutPerSecond</td>
+      <td>The number of bytes read per second from this server labeled with 
<code>user</code>.</td>
+      <td>Meter</td>
+    </tr>
   </tbody>
 </table>
 

Reply via email to