luoyuxia commented on code in PR #1729:
URL: https://github.com/apache/fluss/pull/1729#discussion_r2367017507
##########
fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java:
##########
@@ -132,6 +132,8 @@ public class MetricNames {
public static final String LOG_NUM_SEGMENTS = "numSegments";
public static final String LOG_END_OFFSET = "endOffset";
public static final String REMOTE_LOG_SIZE = "size";
+ public static final String LOG_LAKE_LAG_RECORDS = "lakeLagRecords";
Review Comment:
I don't have a good naming in my mind, maybe
```suggestion
// for lake tiering
public static final String LOG_LAKE_PENDING_RECORDS = "pendingRecords";
```
Inspired from Flink
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
##########
fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java:
##########
@@ -132,6 +132,8 @@ public class MetricNames {
public static final String LOG_NUM_SEGMENTS = "numSegments";
public static final String LOG_END_OFFSET = "endOffset";
public static final String REMOTE_LOG_SIZE = "size";
+ public static final String LOG_LAKE_LAG_RECORDS = "lakeLagRecords";
+ public static final String LOG_LAKE_LAG_TIME_MS = "lakeLagTimestampMs";
Review Comment:
```suggestion
public static final String LOG_LAKE_TIMESTAMP_LAG = "timestampLag";
```
##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java:
##########
@@ -346,6 +346,18 @@ public void registerMetrics(BucketMetricGroup
bucketMetricGroup) {
metricGroup.gauge(
MetricNames.LOG_NUM_SEGMENTS, () ->
localLog.getSegments().numberOfSegments());
metricGroup.gauge(MetricNames.LOG_END_OFFSET,
localLog::getLocalLogEndOffset);
+ metricGroup.gauge(
Review Comment:
Not to register this metric in this tablet-level group. One bucket will have
3 log tablet, register in here will cause we have 3 lag metric. But we just
need one, only the leader should report it.
##########
website/docs/maintenance/observability/monitor-metrics.md:
##########
@@ -757,6 +757,16 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>endOffset</td>
<td>The end offset in local storage for this table bucket.</td>
<td>Gauge</td>
+ </tr>
+ <tr>
+ <td>lakeLagRecords</td>
Review Comment:
If modify accoding to my suggestion, then the `infix` should be
`table_bucket_lakeTiering`
##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java:
##########
@@ -346,6 +346,18 @@ public void registerMetrics(BucketMetricGroup
bucketMetricGroup) {
metricGroup.gauge(
MetricNames.LOG_NUM_SEGMENTS, () ->
localLog.getSegments().numberOfSegments());
metricGroup.gauge(MetricNames.LOG_END_OFFSET,
localLog::getLocalLogEndOffset);
+ metricGroup.gauge(
Review Comment:
Also only datalake enabled table should report this metrics, other tables
shouldn't report this metrics.
##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java:
##########
@@ -346,6 +346,18 @@ public void registerMetrics(BucketMetricGroup
bucketMetricGroup) {
metricGroup.gauge(
MetricNames.LOG_NUM_SEGMENTS, () ->
localLog.getSegments().numberOfSegments());
metricGroup.gauge(MetricNames.LOG_END_OFFSET,
localLog::getLocalLogEndOffset);
+ metricGroup.gauge(
Review Comment:
Suggest to do like that:
```
private void onBecomeNewLeader() {
updateLeaderEndOffsetSnapshot();
if (isDataLakeEnabled()) {
registerLakeTieringMetrics();
}
if (isKvTable()) {
// if it's become new leader, we must
// first destroy the old kv tablet
// if exist. Otherwise, it'll use still the old kv tablet which
will cause data loss
dropKv();
// now, we can create a new kv tablet
createKv();
}
}
private void registerLakeTieringMetrics() {
lakeTieringMetricGroup = bucketMetricGroup.addGroup("lakeTiering");
lakeTieringMetricGroup.gauge(
MetricNames.LOG_LAKE_LAG_RECORDS,
() ->
getLakeLogEndOffset() < 0L
? getLogHighWatermark()
: getLogHighWatermark() -
getLakeLogEndOffset());
lakeTieringMetricGroup.gauge(
MetricNames.LOG_LAKE_LAG_TIME_MS,
() ->
logTablet.getLakeMaxTimestamp() < 0L
? logTablet.getLakeMaxTimestamp()
: logTablet.localMaxTimestamp() -
logTablet.getLakeMaxTimestamp());
}
private void onBecomeNewFollower() {
if (isKvTable()) {
// it should be from leader to follower, we need to destroy the
kv tablet
dropKv();
}
if (lakeTieringMetricGroup != null) {
lakeTieringMetricGroup.close();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]