wuchong commented on code in PR #2080:
URL: https://github.com/apache/fluss/pull/2080#discussion_r2623513164
##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -450,14 +451,15 @@ public void appendRecordsToLog(
int timeoutMs,
int requiredAcks,
Map<TableBucket, MemoryLogRecords> entriesPerBucket,
+ UserContext userContext,
Review Comment:
Add `@Nullable` to the parameter if it may pass `null` parameter (even if
only tests pass `null`). Same to other methods.
##########
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java:
##########
@@ -43,6 +47,8 @@ public class TableMetricGroup extends AbstractMetricGroup {
private final Map<TableBucket, BucketMetricGroup> buckets = new
HashMap<>();
+ private final Map<String, UserMetricGroup> userMetricGroups =
MapUtils.newConcurrentHashMap();
Review Comment:
I don't see cleanup logic for this. This may lead to memory leak or metrics
reporting overhead for idle users. In kafka, there is a
`ClientQuotaManager.InactiveSensorExpirationTimeSeconds` to cleanup inactive
user metrics. I think we need a similar mechenism.
##########
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java:
##########
@@ -222,6 +242,42 @@ public Counter failedPrefixLookupRequests() {
}
}
+ // ------------------------------------------------------------------------
+ // user groups
+ // ------------------------------------------------------------------------
+ private UserMetricGroup getOrCreateUserMetricGroup(String principalName) {
Review Comment:
IIUC, we only need to collect user-level IO metrics, rather than (user *
table)-level. Then I think we can extract this out of `TableMetricGroup` into a
global `UserMetrics` (which maintains a map of user to `UserMetricGroup`) in
`ReplicaManager`. This will also help us to implement user quota in the future,
which has similar implementation in kafka.
##########
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java:
##########
@@ -222,6 +242,42 @@ public Counter failedPrefixLookupRequests() {
}
}
+ // ------------------------------------------------------------------------
+ // user groups
+ // ------------------------------------------------------------------------
+ private UserMetricGroup getOrCreateUserMetricGroup(String principalName) {
+ return userMetricGroups.computeIfAbsent(
+ principalName, name -> new UserMetricGroup(this,
principalName));
+ }
+
+ private static class UserMetricGroup extends AbstractMetricGroup {
+ private final String principalName;
+ protected final Counter bytesIn;
+ protected final Counter bytesOut;
+
+ private UserMetricGroup(TableMetricGroup tableMetricGroup, String
principalName) {
+ super(
+ tableMetricGroup.registry,
+ makeScope(tableMetricGroup, principalName),
+ tableMetricGroup);
+ this.principalName = principalName;
+ bytesIn = new ThreadSafeSimpleCounter();
+ meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn));
+ bytesOut = new ThreadSafeSimpleCounter();
+ meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut));
+ }
+
+ @Override
+ protected String getGroupName(CharacterFilter filter) {
+ return "user";
+ }
+
+ @Override
+ protected void putVariables(Map<String, String> variables) {
+ variables.put("name", principalName);
Review Comment:
Use `user` as the variable name would be more intuitive? Name may confuse
it's table name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]