This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 459c763c3 [metric] Add user-level metrics for byteIn and byteOut
(#2080)
459c763c3 is described below
commit 459c763c3787a980d2c6cf663bb433c0c6bade75
Author: xiaozhou <[email protected]>
AuthorDate: Sun Dec 21 19:13:40 2025 +0800
[metric] Add user-level metrics for byteIn and byteOut (#2080)
Co-authored-by: Jark Wu <[email protected]>
---
.../java/org/apache/fluss/metrics/MetricNames.java | 6 +
.../apache/fluss/server/entity/UserContext.java | 36 ++++
.../apache/fluss/server/metrics/UserMetrics.java | 231 +++++++++++++++++++++
.../metrics/group/AbstractUserMetricGroup.java | 90 ++++++++
.../server/metrics/group/UserMetricGroup.java | 42 ++++
.../metrics/group/UserPerTableMetricGroup.java | 58 ++++++
.../fluss/server/replica/ReplicaManager.java | 37 +++-
.../server/replica/delay/DelayedFetchLog.java | 10 +-
.../apache/fluss/server/tablet/TabletServer.java | 16 +-
.../apache/fluss/server/tablet/TabletService.java | 3 +
.../fluss/server/log/remote/RemoteLogITCase.java | 1 +
.../server/log/remote/RemoteLogManagerTest.java | 6 +
.../fluss/server/log/remote/RemoteLogTTLTest.java | 1 +
.../fluss/server/metrics/UserMetricsTest.java | 128 ++++++++++++
.../server/metrics/group/TestingMetricGroups.java | 31 +++
.../apache/fluss/server/replica/AdjustIsrTest.java | 3 +
.../fluss/server/replica/ReplicaManagerTest.java | 38 +++-
.../fluss/server/replica/ReplicaTestBase.java | 1 +
.../server/replica/delay/DelayedFetchLogTest.java | 4 +-
.../replica/fetcher/ReplicaFetcherITCase.java | 2 +
.../replica/fetcher/ReplicaFetcherThreadTest.java | 11 +
.../replica/fetcher/TestingLeaderEndpoint.java | 1 +
.../maintenance/observability/monitor-metrics.md | 23 +-
23 files changed, 759 insertions(+), 20 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index 4008c6b19..b1326d46b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -74,6 +74,12 @@ public class MetricNames {
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE =
"localSize";
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE =
"remoteLogSize";
+ //
--------------------------------------------------------------------------------------------
+ // metrics for user
+ //
--------------------------------------------------------------------------------------------
+ public static final String BYTES_IN = "bytesIn";
+ public static final String BYTES_OUT = "bytesOut";
+
//
--------------------------------------------------------------------------------------------
// metrics for table
//
--------------------------------------------------------------------------------------------
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java
b/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java
new file mode 100644
index 000000000..1a70bcda4
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/entity/UserContext.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.entity;
+
+import org.apache.fluss.security.acl.FlussPrincipal;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/** The context information of user who writes or reads table. */
+public class UserContext {
+ private final FlussPrincipal principal;
+
+ public UserContext(FlussPrincipal principal) {
+ this.principal = checkNotNull(principal);
+ }
+
+ public FlussPrincipal getPrincipal() {
+ return principal;
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java
new file mode 100644
index 000000000..95ae062fc
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.metrics;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.entity.UserContext;
+import org.apache.fluss.server.metrics.group.AbstractUserMetricGroup;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
+import org.apache.fluss.server.metrics.group.UserMetricGroup;
+import org.apache.fluss.server.metrics.group.UserPerTableMetricGroup;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.concurrent.Scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Designed to manage and cleanup user-level metrics (will be used for future
quota management).
+ *
+ * <p>To be more specific, Fluss server maintains three kind of metrics:
+ *
+ * <ul>
+ * <li>1. Server-level metrics which is never expired or dropped, such as
"tableCount".
+ * <li>2. Table-level metrics which is dropped when the table is deleted,
such as
+ * "bytesInPerSecond" for a table.
+ * <li>3. User-level metrics which will be expired and dropped after a
period of inactivity, such
+ * as "bytesInRate" for a user.
+ * </ul>
+ *
+ * <p>This class mainly manages the user-level metrics. There are many a lot
of users to read/write
+ * tables, but most of them are idle after a period of time. To avoid memory
leak or GC overhead, we
+ * need to clean up those inactive user-level metrics periodically.
+ */
+public class UserMetrics implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(UserMetrics.class);
+ private static final long INACTIVE_METRIC_EXPIRATION_TIME_MS = 3600_000L;
// 1 hour
+ private static final long METRICS_CLEANUP_INTERVAL_MS = 30_000L; // 30s
+
+ private final long inactiveMetricExpirationTimeMs;
+ private final MetricRegistry metricRegistry;
+ private final TabletServerMetricGroup parentMetricGroup;
+ private final ScheduledFuture<?> schedule;
+
+ private final ConcurrentMap<MetricKey, AbstractUserMetricGroup> metrics =
+ MapUtils.newConcurrentHashMap();
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ public UserMetrics(
+ Scheduler cleanupScheduler,
+ MetricRegistry metricRegistry,
+ TabletServerMetricGroup parentMetricGroup) {
+ this(
+ INACTIVE_METRIC_EXPIRATION_TIME_MS,
+ METRICS_CLEANUP_INTERVAL_MS,
+ cleanupScheduler,
+ metricRegistry,
+ parentMetricGroup);
+ }
+
+ @VisibleForTesting
+ UserMetrics(
+ long inactiveMetricExpirationTimeMs,
+ long cleanupIntervalMs,
+ Scheduler cleanupScheduler,
+ MetricRegistry metricRegistry,
+ TabletServerMetricGroup parentMetricGroup) {
+ this.inactiveMetricExpirationTimeMs = inactiveMetricExpirationTimeMs;
+ this.metricRegistry = metricRegistry;
+ this.parentMetricGroup = parentMetricGroup;
+ this.schedule =
+ cleanupScheduler.schedule(
+ "user-metrics-expired-cleanup-task",
+ new ExpiredMetricCleanupTask(),
+ cleanupIntervalMs,
+ cleanupIntervalMs);
+ }
+
+ protected AbstractUserMetricGroup getOrCreateMetric(MetricKey metricKey) {
+ return metrics.computeIfAbsent(
+ metricKey,
+ key -> {
+ if (metricKey.tablePath != null) {
+ return new UserPerTableMetricGroup(
+ metricRegistry,
+ key.userName,
+ key.tablePath,
+ inactiveMetricExpirationTimeMs,
+ parentMetricGroup);
+ } else {
+ return new UserMetricGroup(
+ metricRegistry,
+ key.userName,
+ inactiveMetricExpirationTimeMs,
+ parentMetricGroup);
+ }
+ });
+ }
+
+ /** Increments the number of bytes written by a user on a specific table.
*/
+ public void incBytesIn(@Nullable UserContext userContext, TablePath
tablePath, long numBytes) {
+ incBytes(userContext, tablePath, numBytes, true);
+ }
+
+ /** Increments the number of bytes read by a user on a specific table. */
+ public void incBytesOut(@Nullable UserContext userContext, TablePath
tablePath, long numBytes) {
+ incBytes(userContext, tablePath, numBytes, false);
+ }
+
+ private void incBytes(
+ @Nullable UserContext userContext,
+ TablePath tablePath,
+ long numBytes,
+ boolean bytesIn) {
+ if (userContext == null
+ || userContext.getPrincipal() == FlussPrincipal.ANY
+ || userContext.getPrincipal() ==
FlussPrincipal.WILD_CARD_PRINCIPAL
+ || userContext.getPrincipal() == FlussPrincipal.ANONYMOUS) {
+ // Ignore null or anonymous or wildcard users
+ return;
+ }
+ String userName = userContext.getPrincipal().getName();
+ AbstractUserMetricGroup user = getOrCreateMetric(new
MetricKey(userName, null));
+ AbstractUserMetricGroup perTable = getOrCreateMetric(new
MetricKey(userName, tablePath));
+ if (bytesIn) {
+ user.incBytesIn(numBytes);
+ perTable.incBytesIn(numBytes);
+ } else {
+ user.incBytesOut(numBytes);
+ perTable.incBytesOut(numBytes);
+ }
+ }
+
+ @VisibleForTesting
+ int numMetrics() {
+ return metrics.size();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (isClosed.compareAndSet(false, true)) {
+ schedule.cancel(true);
+ for (AbstractMetricGroup metricGroup : metrics.values()) {
+ metricGroup.close();
+ }
+ metrics.clear();
+ }
+ }
+
+ /** A periodic task to clean up expired user metrics. */
+ private class ExpiredMetricCleanupTask implements Runnable {
+
+ @Override
+ public void run() {
+ for (Map.Entry<MetricKey, AbstractUserMetricGroup> metricEntry :
metrics.entrySet()) {
+ MetricKey metricName = metricEntry.getKey();
+ AbstractUserMetricGroup userMetric = metricEntry.getValue();
+ synchronized (userMetric) {
+ if (userMetric.hasExpired()) {
+ LOG.debug("Removing expired user metric [{}]",
metricName);
+ metrics.remove(metricName);
+ userMetric.close();
+ }
+ }
+ }
+ }
+ }
+
+ /** The key to identify user metrics. */
+ protected static class MetricKey {
+ final String userName;
+ @Nullable final TablePath tablePath;
+
+ MetricKey(String userName, @Nullable TablePath tablePath) {
+ this.userName = userName;
+ this.tablePath = tablePath;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MetricKey metricKey = (MetricKey) o;
+ return Objects.equals(userName, metricKey.userName)
+ && Objects.equals(tablePath, metricKey.tablePath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(userName, tablePath);
+ }
+
+ @Override
+ public String toString() {
+ if (tablePath == null) {
+ return userName;
+ } else {
+ return userName + ":" + tablePath;
+ }
+ }
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java
new file mode 100644
index 000000000..080c72315
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/AbstractUserMetricGroup.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metrics.CharacterFilter;
+import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+
+import java.util.Map;
+
+import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/** Abstract metric the users in server and tracks the expiration. */
+public abstract class AbstractUserMetricGroup extends AbstractMetricGroup {
+ private static final String NAME = "user";
+
+ private final String principalName;
+ private final long inactiveMetricExpirationTimeMs;
+ protected final Counter bytesIn;
+ protected final Counter bytesOut;
+
+ /** The last record time for the group. */
+ protected volatile long lastRecordTime;
+
+ public AbstractUserMetricGroup(
+ MetricRegistry registry,
+ String principalName,
+ long inactiveMetricExpirationTimeMs,
+ TabletServerMetricGroup tabletServerMetricGroup) {
+ super(registry, makeScope(tabletServerMetricGroup, principalName),
tabletServerMetricGroup);
+ this.principalName = checkNotNull(principalName);
+ this.inactiveMetricExpirationTimeMs = inactiveMetricExpirationTimeMs;
+
+ this.bytesIn = new ThreadSafeSimpleCounter();
+ this.bytesOut = new ThreadSafeSimpleCounter();
+
+ this.lastRecordTime = System.currentTimeMillis();
+ }
+
+ @Override
+ protected String getGroupName(CharacterFilter filter) {
+ return NAME;
+ }
+
+ @VisibleForTesting
+ public String getPrincipalName() {
+ return principalName;
+ }
+
+ @Override
+ protected void putVariables(Map<String, String> variables) {
+ variables.put("user", principalName);
+ }
+
+ public void incBytesIn(long numBytes) {
+ this.lastRecordTime = System.currentTimeMillis();
+ bytesIn.inc(numBytes);
+ }
+
+ public void incBytesOut(long numBytes) {
+ this.lastRecordTime = System.currentTimeMillis();
+ bytesOut.inc(numBytes);
+ }
+
+ /** Return true if the metric is eligible for removal due to inactivity.
false otherwise. */
+ public boolean hasExpired() {
+ return (System.currentTimeMillis() - lastRecordTime) >
this.inactiveMetricExpirationTimeMs;
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
new file mode 100644
index 000000000..7a4a0f53e
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserMetricGroup.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.metrics.MeterView;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+
+/**
+ * Metrics for the overall user level in server with {@link
TabletServerMetricGroup} as parent
+ * group.
+ */
+public class UserMetricGroup extends AbstractUserMetricGroup {
+
+ public UserMetricGroup(
+ MetricRegistry registry,
+ String principalName,
+ long inactiveMetricExpirationTimeMs,
+ TabletServerMetricGroup tabletServerMetricGroup) {
+ super(registry, principalName, inactiveMetricExpirationTimeMs,
tabletServerMetricGroup);
+
+ meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn));
+ meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut));
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java
new file mode 100644
index 000000000..55d9cd82a
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/UserPerTableMetricGroup.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+
+import java.util.Map;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Metrics for the users per table in server with {@link
TabletServerMetricGroup} as parent group.
+ */
+public class UserPerTableMetricGroup extends AbstractUserMetricGroup {
+
+ private final TablePath tablePath;
+
+ public UserPerTableMetricGroup(
+ MetricRegistry registry,
+ String principalName,
+ TablePath tablePath,
+ long inactiveMetricExpirationTimeMs,
+ TabletServerMetricGroup tabletServerMetricGroup) {
+ super(registry, principalName, inactiveMetricExpirationTimeMs,
tabletServerMetricGroup);
+ this.tablePath = checkNotNull(tablePath);
+
+ // only track counters for per-table user metrics for billing purposes,
+ // the corresponding rates are tracked at the overall user level
+ counter(MetricNames.BYTES_IN, bytesIn);
+ counter(MetricNames.BYTES_OUT, bytesOut);
+ }
+
+ @Override
+ protected void putVariables(Map<String, String> variables) {
+ super.putVariables(variables);
+ variables.put("database", tablePath.getDatabaseName());
+ variables.put("table", tablePath.getTableName());
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 5eb19da65..d9350257d 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -67,6 +67,7 @@ import
org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket;
import org.apache.fluss.server.entity.NotifyRemoteLogOffsetsData;
import org.apache.fluss.server.entity.StopReplicaData;
import org.apache.fluss.server.entity.StopReplicaResultForBucket;
+import org.apache.fluss.server.entity.UserContext;
import org.apache.fluss.server.kv.KvManager;
import org.apache.fluss.server.kv.KvSnapshotResource;
import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter;
@@ -84,6 +85,7 @@ import
org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
import org.apache.fluss.server.log.remote.RemoteLogManager;
import org.apache.fluss.server.metadata.ClusterMetadata;
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
+import org.apache.fluss.server.metrics.UserMetrics;
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
import org.apache.fluss.server.metrics.group.TableMetricGroup;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
@@ -186,6 +188,7 @@ public class ReplicaManager {
// for metrics
private final TabletServerMetricGroup serverMetricGroup;
+ private final UserMetrics userMetrics;
private final String internalListenerName;
private final Clock clock;
@@ -203,6 +206,7 @@ public class ReplicaManager {
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
FatalErrorHandler fatalErrorHandler,
TabletServerMetricGroup serverMetricGroup,
+ UserMetrics userMetrics,
Clock clock,
ExecutorService ioExecutor)
throws IOException {
@@ -219,6 +223,7 @@ public class ReplicaManager {
completedKvSnapshotCommitter,
fatalErrorHandler,
serverMetricGroup,
+ userMetrics,
new RemoteLogManager(conf, zkClient, coordinatorGateway,
clock, ioExecutor),
clock,
ioExecutor);
@@ -238,6 +243,7 @@ public class ReplicaManager {
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
FatalErrorHandler fatalErrorHandler,
TabletServerMetricGroup serverMetricGroup,
+ UserMetrics userMetrics,
RemoteLogManager remoteLogManager,
Clock clock,
ExecutorService ioExecutor)
@@ -284,6 +290,7 @@ public class ReplicaManager {
zkClient, completedKvSnapshotCommitter,
kvSnapshotResource, conf);
this.remoteLogManager = remoteLogManager;
this.serverMetricGroup = serverMetricGroup;
+ this.userMetrics = userMetrics;
this.clock = clock;
this.ioExecutor = ioExecutor;
registerMetrics();
@@ -456,6 +463,7 @@ public class ReplicaManager {
int timeoutMs,
int requiredAcks,
Map<TableBucket, MemoryLogRecords> entriesPerBucket,
+ @Nullable UserContext userContext,
Consumer<List<ProduceLogResultForBucket>> responseCallback) {
if (isRequiredAcksInvalid(requiredAcks)) {
throw new InvalidRequiredAcksException("Invalid required acks: " +
requiredAcks);
@@ -463,7 +471,7 @@ public class ReplicaManager {
long startTime = System.currentTimeMillis();
Map<TableBucket, ProduceLogResultForBucket> appendResult =
- appendToLocalLog(entriesPerBucket, requiredAcks);
+ appendToLocalLog(entriesPerBucket, requiredAcks, userContext);
LOG.debug("Append records to local log in {} ms",
System.currentTimeMillis() - startTime);
// maybe do delay write operation.
@@ -480,9 +488,11 @@ public class ReplicaManager {
public void fetchLogRecords(
FetchParams params,
Map<TableBucket, FetchReqInfo> bucketFetchInfo,
+ @Nullable UserContext userContext,
Consumer<Map<TableBucket, FetchLogResultForBucket>>
responseCallback) {
long startTime = System.currentTimeMillis();
- Map<TableBucket, LogReadResult> logReadResults = readFromLog(params,
bucketFetchInfo);
+ Map<TableBucket, LogReadResult> logReadResults =
+ readFromLog(params, bucketFetchInfo, userContext);
if (LOG.isTraceEnabled()) {
LOG.trace(
"Fetch log records from local log in {} ms",
@@ -490,7 +500,8 @@ public class ReplicaManager {
}
// maybe do delay fetch log operation.
- maybeAddDelayedFetchLog(params, bucketFetchInfo, logReadResults,
responseCallback);
+ maybeAddDelayedFetchLog(
+ params, bucketFetchInfo, logReadResults, userContext,
responseCallback);
}
/**
@@ -925,18 +936,23 @@ public class ReplicaManager {
/** Append log records to leader replicas of the buckets. */
private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
- Map<TableBucket, MemoryLogRecords> entriesPerBucket, int
requiredAcks) {
+ Map<TableBucket, MemoryLogRecords> entriesPerBucket,
+ int requiredAcks,
+ @Nullable UserContext userContext) {
Map<TableBucket, ProduceLogResultForBucket> resultForBucketMap = new
HashMap<>();
for (Map.Entry<TableBucket, MemoryLogRecords> entry :
entriesPerBucket.entrySet()) {
TableBucket tb = entry.getKey();
+ MemoryLogRecords records = entry.getValue();
TableMetricGroup tableMetrics = null;
try {
Replica replica = getReplicaOrException(tb);
tableMetrics = replica.tableMetrics();
tableMetrics.totalProduceLogRequests().inc();
+ // record user metrics before appending to log,
+ // so that if appending fails, we still account the bytes.
+ userMetrics.incBytesIn(userContext, replica.getTablePath(),
records.sizeInBytes());
LOG.trace("Append records to local log tablet for table bucket
{}", tb);
- LogAppendInfo appendInfo =
- replica.appendRecordsToLeader(entry.getValue(),
requiredAcks);
+ LogAppendInfo appendInfo =
replica.appendRecordsToLeader(records, requiredAcks);
long baseOffset = appendInfo.firstOffset();
LOG.trace(
@@ -1049,7 +1065,9 @@ public class ReplicaManager {
}
public Map<TableBucket, LogReadResult> readFromLog(
- FetchParams fetchParams, Map<TableBucket, FetchReqInfo>
bucketFetchInfo) {
+ FetchParams fetchParams,
+ Map<TableBucket, FetchReqInfo> bucketFetchInfo,
+ @Nullable UserContext userContext) {
Map<TableBucket, LogReadResult> logReadResult = new HashMap<>();
boolean isFromFollower = fetchParams.isFromFollower();
int limitBytes = fetchParams.maxFetchBytes();
@@ -1108,6 +1126,7 @@ public class ReplicaManager {
serverMetricGroup.replicationBytesOut().inc(recordBatchSize);
} else {
tableMetrics.incLogBytesOut(recordBatchSize);
+ userMetrics.incBytesOut(userContext,
replica.getTablePath(), recordBatchSize);
}
} catch (Exception e) {
if (isUnexpectedException(e)) {
@@ -1297,6 +1316,7 @@ public class ReplicaManager {
FetchParams params,
Map<TableBucket, FetchReqInfo> bucketFetchInfo,
Map<TableBucket, LogReadResult> logReadResults,
+ @Nullable UserContext userContext,
Consumer<Map<TableBucket, FetchLogResultForBucket>>
responseCallback) {
long bytesReadable = 0;
boolean errorReadingData = false;
@@ -1363,7 +1383,8 @@ public class ReplicaManager {
delayedResponse.putAll(expectedErrorBuckets);
responseCallback.accept(delayedResponse);
},
- serverMetricGroup);
+ serverMetricGroup,
+ userContext);
// try to complete the request immediately, otherwise put it into
the
// delayedFetchLogManager; this is because while the delayed fetch
log operation is
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
index 9e0c49052..e17b7e1cc 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java
@@ -28,6 +28,7 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
import org.apache.fluss.rpc.messages.FetchLogRequest;
import org.apache.fluss.server.entity.FetchReqInfo;
+import org.apache.fluss.server.entity.UserContext;
import org.apache.fluss.server.log.FetchIsolation;
import org.apache.fluss.server.log.FetchParams;
import org.apache.fluss.server.log.LogOffsetMetadata;
@@ -40,6 +41,8 @@ import
org.apache.fluss.server.replica.ReplicaManager.LogReadResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -59,19 +62,22 @@ public class DelayedFetchLog extends DelayedOperation {
private final Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap;
private final Consumer<Map<TableBucket, FetchLogResultForBucket>>
responseCallback;
private final TabletServerMetricGroup serverMetricGroup;
+ @Nullable private final UserContext userContext;
public DelayedFetchLog(
FetchParams params,
ReplicaManager replicaManager,
Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap,
Consumer<Map<TableBucket, FetchLogResultForBucket>>
responseCallback,
- TabletServerMetricGroup serverMetricGroup) {
+ TabletServerMetricGroup serverMetricGroup,
+ @Nullable UserContext userContext) {
super(params.maxWaitMs());
this.params = params;
this.replicaManager = replicaManager;
this.fetchBucketStatusMap = fetchBucketStatusMap;
this.responseCallback = responseCallback;
this.serverMetricGroup = serverMetricGroup;
+ this.userContext = userContext;
}
/** Upon completion, read whatever data is available and pass to the
complete callback. */
@@ -93,7 +99,7 @@ public class DelayedFetchLog extends DelayedOperation {
// re-fetch data.
Map<TableBucket, LogReadResult> reReadResult =
- replicaManager.readFromLog(params, reFetchBuckets);
+ replicaManager.readFromLog(params, reFetchBuckets,
userContext);
reReadResult.forEach((key, value) -> result.put(key,
value.getFetchLogResultForBucket()));
responseCallback.accept(result);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 88fa36ff4..bd185ec62 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -46,6 +46,7 @@ import org.apache.fluss.server.log.LogManager;
import org.apache.fluss.server.log.remote.RemoteLogManager;
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
import org.apache.fluss.server.metrics.ServerMetricUtils;
+import org.apache.fluss.server.metrics.UserMetrics;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.replica.ReplicaManager;
import org.apache.fluss.server.zk.ZooKeeperClient;
@@ -127,6 +128,9 @@ public class TabletServer extends ServerBase {
@GuardedBy("lock")
private MetricRegistry metricRegistry;
+ @GuardedBy("lock")
+ private UserMetrics userMetrics;
+
@GuardedBy("lock")
private TabletServerMetricGroup tabletServerMetricGroup;
@@ -194,6 +198,9 @@ public class TabletServer extends ServerBase {
List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf,
ServerType.TABLET_SERVER);
+ this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
+ scheduler.startup();
+
// for metrics
this.metricRegistry = MetricRegistry.create(conf, pluginManager);
this.tabletServerMetricGroup =
@@ -203,6 +210,7 @@ public class TabletServer extends ServerBase {
rack,
endpoints.get(0).getHost(),
serverId);
+ this.userMetrics = new UserMetrics(scheduler, metricRegistry,
tabletServerMetricGroup);
this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
@@ -216,9 +224,6 @@ public class TabletServer extends ServerBase {
this.metadataCache = new
TabletServerMetadataCache(metadataManager);
- this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
- scheduler.startup();
-
this.logManager =
LogManager.create(conf, zkClient, scheduler, clock,
tabletServerMetricGroup);
logManager.startup();
@@ -262,6 +267,7 @@ public class TabletServer extends ServerBase {
rpcClient, metadataCache,
interListenerName),
this,
tabletServerMetricGroup,
+ userMetrics,
clock,
ioExecutor);
replicaManager.startup();
@@ -408,6 +414,10 @@ public class TabletServer extends ServerBase {
clientMetricGroup.close();
}
+ if (userMetrics != null) {
+ userMetrics.close();
+ }
+
// We must shut down the scheduler early because otherwise,
the scheduler could
// touch other resources that might have been shutdown and
cause exceptions.
if (scheduler != null) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index 837bc1b23..9a47b6383 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -70,6 +70,7 @@ import org.apache.fluss.server.authorizer.Authorizer;
import org.apache.fluss.server.coordinator.MetadataManager;
import org.apache.fluss.server.entity.FetchReqInfo;
import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
+import org.apache.fluss.server.entity.UserContext;
import org.apache.fluss.server.log.FetchParams;
import org.apache.fluss.server.log.ListOffsetsParam;
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
@@ -169,6 +170,7 @@ public final class TabletService extends RpcServiceBase
implements TabletServerG
request.getTimeoutMs(),
request.getAcks(),
produceLogData,
+ new UserContext(currentSession().getPrincipal()),
bucketResponseMap ->
response.complete(makeProduceLogResponse(bucketResponseMap)));
return response;
}
@@ -193,6 +195,7 @@ public final class TabletService extends RpcServiceBase
implements TabletServerG
replicaManager.fetchLogRecords(
fetchParams,
interesting,
+ new UserContext(currentSession().getPrincipal()),
fetchResponseMap ->
response.complete(
makeFetchLogResponse(fetchResponseMap,
errorResponseMap)));
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
index a40dfe162..3187e5db5 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
@@ -133,6 +133,7 @@ public class RemoteLogITCase {
.fetchLogRecords(
new FetchParams(-1, Integer.MAX_VALUE),
Collections.singletonMap(tb, new FetchReqInfo(tableId,
0, 10240)),
+ null,
future::complete);
Map<TableBucket, FetchLogResultForBucket> result = future.get();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
index 9c91890b6..e2e5be650 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
@@ -322,6 +322,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
replicaManager.fetchLogRecords(
new FetchParams(-1, Integer.MAX_VALUE),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
0L, 1024 * 1024)),
+ null,
future::complete);
Map<TableBucket, FetchLogResultForBucket> result = future.get();
assertThat(result.size()).isEqualTo(1);
@@ -341,6 +342,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
replicaManager.fetchLogRecords(
new FetchParams(-1, Integer.MAX_VALUE),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
47, 1024 * 1024)),
+ null,
future::complete);
result = future.get();
assertThat(result.size()).isEqualTo(1);
@@ -379,6 +381,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
replicaManager.fetchLogRecords(
new FetchParams(-1, Integer.MAX_VALUE),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
0L, 1024 * 1024)),
+ null,
future::complete);
Map<TableBucket, FetchLogResultForBucket> result = future.get();
assertThat(result.size()).isEqualTo(1);
@@ -398,6 +401,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
replicaManager.fetchLogRecords(
new FetchParams(-1, Integer.MAX_VALUE),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
35, 1024 * 1024)),
+ null,
future::complete);
result = future.get();
assertThat(result.size()).isEqualTo(1);
@@ -449,6 +453,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
replicaManager.fetchLogRecords(
new FetchParams(-1, Integer.MAX_VALUE),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
20L, 1024 * 1024)),
+ null,
future::complete);
Map<TableBucket, FetchLogResultForBucket> result = future.get();
assertThat(result.get(tb).fetchFromRemote()).isFalse();
@@ -459,6 +464,7 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
replicaManager.fetchLogRecords(
new FetchParams(-1, Integer.MAX_VALUE),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
0, 1024 * 1024)),
+ null,
future::complete);
result = future.get();
assertThat(result.get(tb).fetchFromRemote()).isTrue();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java
index 1f0d24d05..05f12a18b 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java
@@ -82,6 +82,7 @@ final class RemoteLogTTLTest extends RemoteLogTestBase {
replicaManager.fetchLogRecords(
new FetchParams(-1, Integer.MAX_VALUE),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
0L, 1024 * 1024)),
+ null,
future::complete);
Map<TableBucket, FetchLogResultForBucket> result = future.get();
assertThat(result.size()).isEqualTo(1);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java
new file mode 100644
index 000000000..f94205a5a
--- /dev/null
+++
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/UserMetricsTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.metrics;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.registry.NOPMetricRegistry;
+import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.entity.UserContext;
+import org.apache.fluss.server.metrics.group.AbstractUserMetricGroup;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static
org.apache.fluss.server.metrics.group.TestingMetricGroups.TABLET_SERVER_METRICS;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link UserMetrics}. */
+public class UserMetricsTest {
+
+ private FlussScheduler scheduler;
+
+ @BeforeEach
+ void before() {
+ scheduler = new FlussScheduler(2, false);
+ scheduler.startup();
+ }
+
+ @AfterEach
+ void after() throws InterruptedException {
+ scheduler.shutdown();
+ }
+
+ @Test
+ void testMetricExpiration() {
+ // set expiration time to 2s in test, and 10ms check interval
+ UserMetrics userMetrics =
+ new UserMetrics(
+ Duration.ofSeconds(2).toMillis(),
+ 100L,
+ scheduler,
+ NOPMetricRegistry.INSTANCE,
+ TABLET_SERVER_METRICS);
+
+ String user1 = "user1";
+ String user2 = "user2";
+ UserContext uc1 = new UserContext(new FlussPrincipal(user1, "USER"));
+ UserContext uc2 = new UserContext(new FlussPrincipal(user2, "USER"));
+
+ TablePath t1 = TablePath.of("db1", "table1");
+ TablePath t2 = TablePath.of("db1", "table2");
+
+ userMetrics.incBytesIn(null, t1, 100);
+ userMetrics.incBytesOut(null, t2, 200);
+ userMetrics.incBytesOut(new UserContext(FlussPrincipal.ANY), t1, 200);
+ userMetrics.incBytesIn(new UserContext(FlussPrincipal.ANONYMOUS), t1,
300);
+ userMetrics.incBytesOut(new
UserContext(FlussPrincipal.WILD_CARD_PRINCIPAL), t2, 100);
+
+ // ignore null/anonymous/wildcard user
+ assertThat(userMetrics.numMetrics()).isEqualTo(0);
+
+ userMetrics.incBytesIn(uc1, t1, 100);
+ userMetrics.incBytesOut(uc1, t2, 100);
+ userMetrics.incBytesOut(uc2, t2, 200);
+
+ // u1, u1+t1, u1+t2, u2, u2+t2
+ assertThat(userMetrics.numMetrics()).isEqualTo(5);
+
+ scheduler.schedule(
+ "updating-u1t1",
+ () -> {
+ userMetrics.incBytesIn(uc1, t1, 50);
+ },
+ 0L,
+ 50L);
+
+ AbstractUserMetricGroup u1t1 =
+ userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1,
t1));
+ AbstractUserMetricGroup u1t2 =
+ userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1,
t2));
+ AbstractUserMetricGroup u2t2 =
+ userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user2,
t2));
+ AbstractUserMetricGroup u1 =
+ userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user1,
null));
+ AbstractUserMetricGroup u2 =
+ userMetrics.getOrCreateMetric(new UserMetrics.MetricKey(user2,
null));
+
+ // wait enough time for metrics expired
+ retry(
+ Duration.ofSeconds(10),
+ () -> {
+ // only u1 and u1+t1 are retained.
+ assertThat(userMetrics.numMetrics()).isEqualTo(2);
+ });
+
+ assertThat(u1t2.isClosed()).isTrue();
+ assertThat(u2t2.isClosed()).isTrue();
+ assertThat(u2.isClosed()).isTrue();
+
+ assertThat(u1.isClosed()).isFalse();
+ assertThat(u1t1.isClosed()).isFalse();
+ assertThat(userMetrics.getOrCreateMetric(new
UserMetrics.MetricKey(user1, null)))
+ .isSameAs(u1);
+ assertThat(userMetrics.getOrCreateMetric(new
UserMetrics.MetricKey(user1, t1)))
+ .isSameAs(u1t1);
+ }
+}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
index 15d1316b5..b2e93f2bb 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
@@ -19,6 +19,11 @@ package org.apache.fluss.server.metrics.group;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.registry.NOPMetricRegistry;
+import org.apache.fluss.server.metrics.UserMetrics;
+import org.apache.fluss.testutils.common.ScheduledTask;
+import org.apache.fluss.utils.concurrent.Scheduler;
+
+import java.util.concurrent.ScheduledFuture;
/** Utilities for various metric groups for testing. */
public class TestingMetricGroups {
@@ -38,4 +43,30 @@ public class TestingMetricGroups {
public static final BucketMetricGroup BUCKET_METRICS =
new BucketMetricGroup(NOPMetricRegistry.INSTANCE, null, 0,
TABLE_METRICS);
+
+ public static final UserMetrics USER_METRICS =
+ new UserMetrics(
+ new TestingScheduler(), NOPMetricRegistry.INSTANCE,
TABLET_SERVER_METRICS);
+
+ //
------------------------------------------------------------------------------------------
+
+ private static class TestingScheduler implements Scheduler {
+ @Override
+ public void startup() {
+ // no-op
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ // no-op
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(
+ String name, Runnable task, long delayMs, long periodMs) {
+ // Directly run the task for testing purpose.
+ task.run();
+ return new ScheduledTask<>(() -> null, delayMs, periodMs);
+ }
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
index 64eade999..1779b96f8 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
@@ -67,6 +67,7 @@ public class AdjustIsrTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
@@ -77,6 +78,7 @@ public class AdjustIsrTest extends ReplicaTestBase {
2, (int)
conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_BYTES).getBytes()),
Collections.singletonMap(
tb, new FetchReqInfo(tb.getTableId(), 10L,
Integer.MAX_VALUE)),
+ null,
result -> {});
retry(
Duration.ofSeconds(20),
@@ -92,6 +94,7 @@ public class AdjustIsrTest extends ReplicaTestBase {
3, (int)
conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_BYTES).getBytes()),
Collections.singletonMap(
tb, new FetchReqInfo(tb.getTableId(), 10L,
Integer.MAX_VALUE)),
+ null,
result -> {});
retry(
Duration.ofSeconds(20),
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index 6ec71bc79..7db23130f 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -145,6 +145,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
@@ -154,6 +155,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 10L, 20L));
@@ -165,6 +167,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
100,
Collections.singletonMap(
tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
(result) -> {
// do nothing.
}))
@@ -179,6 +182,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(unknownBucket,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get())
.containsOnly(
@@ -203,6 +207,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.fetchLogRecords(
buildFetchParams(-1),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
0L, 1024 * 1024)),
+ null,
emptyFuture::complete);
Map<TableBucket, FetchLogResultForBucket> result = emptyFuture.get();
assertThat(result.size()).isEqualTo(1);
@@ -217,6 +222,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
@@ -226,6 +232,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.fetchLogRecords(
buildFetchParams(-1),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
0L, 1024 * 1024)),
+ null,
future1::complete);
result = future1.get();
assertThat(result.size()).isEqualTo(1);
@@ -241,6 +248,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.fetchLogRecords(
buildFetchParams(-1),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
3L, 1024 * 1024)),
+ null,
future1::complete);
result = future1.get();
assertThat(result.size()).isEqualTo(1);
@@ -257,6 +265,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 10L, 20L));
@@ -265,6 +274,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.fetchLogRecords(
buildFetchParams(-1),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
10L, 1024 * 1024)),
+ null,
future1::complete);
result = future1.get();
assertThat(result.size()).isEqualTo(1);
@@ -280,6 +290,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.fetchLogRecords(
buildFetchParams(-1),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
100L, 1024 * 1024)),
+ null,
future1::complete);
result = future1.get();
assertThat(result.size()).isEqualTo(1);
@@ -297,6 +308,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.fetchLogRecords(
buildFetchParams(-1),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
20L, 1024 * 1024)),
+ null,
future1::complete);
result = future1.get();
assertThat(result.size()).isEqualTo(1);
@@ -319,7 +331,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
int maxFetchBytesSize = batchSize + 10;
CompletableFuture<List<ProduceLogResultForBucket>> future = new
CompletableFuture<>();
replicaManager.appendRecordsToLog(
- 20000, 1, Collections.singletonMap(tb, records1),
future::complete);
+ 20000, 1, Collections.singletonMap(tb, records1), null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
// fetch from this bucket from offset 0 with fetch max bytes size
bigger that data1 batch
@@ -330,6 +342,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
buildFetchParams(-1, maxFetchBytesSize),
Collections.singletonMap(
tb, new FetchReqInfo(tb.getTableId(), 0L,
Integer.MAX_VALUE)),
+ null,
future1::complete);
Map<TableBucket, FetchLogResultForBucket> result = future1.get();
assertThat(result.size()).isEqualTo(1);
@@ -347,6 +360,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(ANOTHER_DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 10L, 20L));
@@ -356,6 +370,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
buildFetchParams(-1),
Collections.singletonMap(
tb, new FetchReqInfo(tb.getTableId(), 0,
Integer.MAX_VALUE)),
+ null,
future1::complete);
result = future1.get();
resultForBucket = result.get(tb);
@@ -374,6 +389,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
buildFetchParams(-1, maxFetchBytesSize),
Collections.singletonMap(
tb, new FetchReqInfo(tb.getTableId(), 0,
Integer.MAX_VALUE)),
+ null,
future1::complete);
result = future1.get();
resultForBucket = result.get(tb);
@@ -397,7 +413,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
Map<TableBucket, MemoryLogRecords> data = new HashMap<>();
data.put(tb1, genMemoryLogRecordsByObject(DATA1));
data.put(tb2, genMemoryLogRecordsByObject(DATA1));
- replicaManager.appendRecordsToLog(20000, 1, data, future::complete);
+ replicaManager.appendRecordsToLog(20000, 1, data, null,
future::complete);
assertThat(future.get())
.containsExactlyInAnyOrder(
new ProduceLogResultForBucket(tb1, 0, 10L),
@@ -408,7 +424,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
data = new HashMap<>();
data.put(tb1, genMemoryLogRecordsByObject(ANOTHER_DATA1));
data.put(tb2, genMemoryLogRecordsByObject(ANOTHER_DATA1));
- replicaManager.appendRecordsToLog(20000, 1, data, future::complete);
+ replicaManager.appendRecordsToLog(20000, 1, data, null,
future::complete);
assertThat(future.get())
.containsExactlyInAnyOrder(
new ProduceLogResultForBucket(tb1, 10L, 20L),
@@ -421,7 +437,8 @@ class ReplicaManagerTest extends ReplicaTestBase {
Map<TableBucket, FetchReqInfo> newFetchData = new HashMap<>();
newFetchData.put(tb1, new FetchReqInfo(tb1.getTableId(), 0,
Integer.MAX_VALUE));
newFetchData.put(tb2, new FetchReqInfo(tb2.getTableId(), 0,
Integer.MAX_VALUE));
- replicaManager.fetchLogRecords(buildFetchParams(-1, 10), newFetchData,
future1::complete);
+ replicaManager.fetchLogRecords(
+ buildFetchParams(-1, 10), newFetchData, null,
future1::complete);
Map<TableBucket, FetchLogResultForBucket> result = future1.get();
assertThat(result.size()).isEqualTo(2);
List<FetchLogResultForBucket> resultList = new
ArrayList<>(result.values());
@@ -533,6 +550,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.fetchLogRecords(
buildFetchParams(-1),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
0L, 1024 * 1024)),
+ null,
future1::complete);
FetchLogResultForBucket resultForBucket = future1.get().get(tb);
assertThat(resultForBucket.getHighWatermark()).isEqualTo(5L);
@@ -574,6 +592,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.fetchLogRecords(
buildFetchParams(-1),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
0L, 1024 * 1024)),
+ null,
future1::complete);
resultForBucket = future1.get().get(tb);
assertThat(resultForBucket.getHighWatermark()).isEqualTo(5L);
@@ -612,6 +631,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.fetchLogRecords(
buildFetchParams(-1),
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(),
0L, 1024 * 1024)),
+ null,
future1::complete);
resultForBucket = future1.get().get(tb);
assertThat(resultForBucket.getHighWatermark()).isEqualTo(8L);
@@ -667,6 +687,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
buildFetchParams(-1),
Collections.singletonMap(
tb, new FetchReqInfo(tb.getTableId(), 0L,
Integer.MAX_VALUE)),
+ null,
future1::complete);
FetchLogResultForBucket resultForBucket = future1.get().get(tb);
assertThat(resultForBucket.getHighWatermark()).isEqualTo(18L);
@@ -892,6 +913,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
// produce another batch to this bucket.
@@ -900,6 +922,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(ANOTHER_DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 10, 20L));
@@ -928,6 +951,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
@@ -984,6 +1008,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
20000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
future.get();
// advance clock to generate different batch commit timestamp.
@@ -1005,6 +1030,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
buildFetchParams(-1, Integer.MAX_VALUE),
Collections.singletonMap(
tb, new FetchReqInfo(tb.getTableId(), 0L,
Integer.MAX_VALUE)),
+ null,
future::complete);
Map<Long, Long> offsetToCommitTimestampMap =
startOffsetToBatchCommitTimestamp(future.get().get(tb));
@@ -1082,6 +1108,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
300000,
-1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
}
@@ -1809,7 +1836,8 @@ class ReplicaManagerTest extends ReplicaTestBase {
for (TableBucket tb : tableBuckets) {
fetchData.put(tb, new FetchReqInfo(tb.getTableId(), 0L, 1024 *
1024));
}
- replicaManager.fetchLogRecords(buildFetchParams(-1), fetchData,
fetchLogFuture::complete);
+ replicaManager.fetchLogRecords(
+ buildFetchParams(-1), fetchData, null,
fetchLogFuture::complete);
return fetchLogFuture.get();
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 81cc34aad..559353df6 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -303,6 +303,7 @@ public class ReplicaTestBase {
snapshotReporter,
NOPErrorHandler.INSTANCE,
TestingMetricGroups.TABLET_SERVER_METRICS,
+ TestingMetricGroups.USER_METRICS,
remoteLogManager,
manualClock,
ioExecutor);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/delay/DelayedFetchLogTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/delay/DelayedFetchLogTest.java
index 7822a0a4a..73d620842 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/delay/DelayedFetchLogTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/delay/DelayedFetchLogTest.java
@@ -90,6 +90,7 @@ public class DelayedFetchLogTest extends ReplicaTestBase {
20000,
-1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
@@ -161,6 +162,7 @@ public class DelayedFetchLogTest extends ReplicaTestBase {
replicaManager,
Collections.singletonMap(tb, prevFetchBucketStatus),
responseCallback,
- TestingMetricGroups.TABLET_SERVER_METRICS);
+ TestingMetricGroups.TABLET_SERVER_METRICS,
+ null);
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
index 0b149abee..1e787a345 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherITCase.java
@@ -157,6 +157,7 @@ public class ReplicaFetcherITCase {
replicaManager.fetchLogRecords(
new FetchParams(-1, false, Integer.MAX_VALUE, -1, -1),
Collections.singletonMap(tb, new FetchReqInfo(tableId, 0L,
1024 * 1024)),
+ null,
future::complete);
Map<TableBucket, FetchLogResultForBucket> result = future.get();
assertThat(result.size()).isEqualTo(1);
@@ -233,6 +234,7 @@ public class ReplicaFetcherITCase {
replicaManager.fetchLogRecords(
new FetchParams(-1, false, Integer.MAX_VALUE, -1, -1),
Collections.singletonMap(tb, new FetchReqInfo(tableId, 0L,
1024 * 1024)),
+ null,
future::complete);
Map<TableBucket, FetchLogResultForBucket> result = future.get();
assertThat(result.size()).isEqualTo(1);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index 57f6be484..c857d84de 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -76,6 +76,7 @@ import static
org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static
org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH;
+import static
org.apache.fluss.server.metrics.group.TestingMetricGroups.USER_METRICS;
import static
org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_BUCKET_EPOCH;
import static
org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_LEADER_EPOCH;
import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
@@ -152,6 +153,7 @@ public class ReplicaFetcherThreadTest {
1000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
@@ -175,6 +177,7 @@ public class ReplicaFetcherThreadTest {
1000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 10L, 20L));
retry(
@@ -208,6 +211,7 @@ public class ReplicaFetcherThreadTest {
1000,
1, // don't wait ack
Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
future::complete);
assertThat(future.get())
.containsOnly(new ProduceLogResultForBucket(tb,
baseOffset, baseOffset + 10L));
@@ -240,6 +244,7 @@ public class ReplicaFetcherThreadTest {
1,
Collections.singletonMap(
tb, genMemoryLogRecordsWithWriterId(DATA1,
writerId, i, 0)),
+ null,
future::complete);
assertThat(future.get())
.containsOnly(
@@ -274,6 +279,7 @@ public class ReplicaFetcherThreadTest {
1000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsWithWriterId(DATA1, 101L, 5, 100L)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 100L, 110L));
@@ -285,6 +291,7 @@ public class ReplicaFetcherThreadTest {
1000,
1,
Collections.singletonMap(tb,
genMemoryLogRecordsWithWriterId(DATA1, 100L, 5, 110L)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 110L, 120L));
retry(
@@ -306,6 +313,7 @@ public class ReplicaFetcherThreadTest {
1,
Collections.singletonMap(
tb, genMemoryLogRecordsWithWriterId(DATA1, writerId,
0, 0)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0L, 10L));
@@ -326,6 +334,7 @@ public class ReplicaFetcherThreadTest {
1,
Collections.singletonMap(
tb, genMemoryLogRecordsWithWriterId(DATA1, writerId,
1, 0)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 10L, 20L));
@@ -355,6 +364,7 @@ public class ReplicaFetcherThreadTest {
1,
Collections.singletonMap(
tb, genMemoryLogRecordsWithWriterId(DATA1, writerId,
2, 0)),
+ null,
future::complete);
assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 20L, 30L));
// now fetcher will work well since the state of writerId=101 is
established
@@ -469,6 +479,7 @@ public class ReplicaFetcherThreadTest {
new TestingCompletedKvSnapshotCommitter(),
NOPErrorHandler.INSTANCE,
serverMetricGroup,
+ USER_METRICS,
clock,
ioExecutor);
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
index ecae06d5d..67e8b0ac6 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/TestingLeaderEndpoint.java
@@ -92,6 +92,7 @@ public class TestingLeaderEndpoint implements LeaderEndpoint {
new FetchParams(
fetchLogRequest.getFollowerServerId(),
fetchLogRequest.getMaxBytes()),
fetchLogData,
+ null,
result ->
response.complete(
new FetchData(new FetchLogResponse(),
processResult(result))));
diff --git a/website/docs/maintenance/observability/monitor-metrics.md
b/website/docs/maintenance/observability/monitor-metrics.md
index 6f2eb5156..2d8ef8077 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -380,7 +380,7 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
</thead>
<tbody>
<tr>
- <th rowspan="29"><strong>tabletserver</strong></th>
+ <th rowspan="33"><strong>tabletserver</strong></th>
<td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="25">-</td>
<td>messagesInPerSecond</td>
<td>The number of messages written per second to this server.</td>
@@ -528,6 +528,27 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>The physical remote log size managed by this TabletServer.</td>
<td>Gauge</td>
</tr>
+ <tr>
+ <td rowspan="4">user</td>
+ <td>bytesIn</td>
+ <td>The total number of bytes written to this server labeled with
<code>user</code> name and <code>database</code> name and <code>table</code>
name. </td>
+ <td>Counter</td>
+ </tr>
+ <tr>
+ <td>bytesOut</td>
+ <td>The total number of bytes read from this server labeled with
<code>user</code> name and <code>database</code> name and <code>table</code>
name. </td>
+ <td>Counter</td>
+ </tr>
+ <tr>
+ <td>bytesInPerSecond</td>
+ <td>The number of bytes written per second to this server labeled with
<code>user</code>.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
+ <td>bytesOutPerSecond</td>
+ <td>The number of bytes read per second from this server labeled with
<code>user</code>.</td>
+ <td>Meter</td>
+ </tr>
</tbody>
</table>