Myasuka commented on code in PR #24050:
URL: https://github.com/apache/flink/pull/24050#discussion_r1451751194
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java:
##########
@@ -302,6 +303,20 @@ public class RocksDBNativeMetricOptions implements
Serializable {
.withDescription(
"Monitor the duration of writer requiring to wait
for compaction or flush to finish in RocksDB.");
+ public static final ConfigOption<Boolean> MONITOR_DB_GET =
+ ConfigOptions.key("state.backend.rocksdb.metrics.db_get")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Monitor the metric that measures the time taken
for a RocksDB database to perform a read operation in microseconds");
+
+ public static final ConfigOption<Boolean> MONITOR_DB_WRITE =
+ ConfigOptions.key("state.backend.rocksdb.metrics.db_write")
Review Comment:
Same here, I think `state.backend.rocksdb.metrics.db-write` looks better.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java:
##########
@@ -231,4 +252,88 @@ public void update() {
setStatistics(this);
}
}
+
+ /**
+ * A gauge which periodically pulls a RocksDB statistics-based native
statistic metric for the database.
Review Comment:
It's not a gauge here.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java:
##########
@@ -231,4 +252,88 @@ public void update() {
setStatistics(this);
}
}
+
+ /**
+ * A gauge which periodically pulls a RocksDB statistics-based native
statistic metric for the database.
+ */
+ class RocksDBNativeHistogramStatisticsMetricView extends RocksDBNativeView
implements Histogram {
+ private final HistogramType histogramType;
+
+ private HistogramData histogramData;
+
+ private RocksDBNativeHistogramStatisticsMetricView(HistogramType
histogramType) {
+ this.histogramType = histogramType;
+ }
+
+ void setValue(HistogramData histogramData) {
+ this.histogramData = histogramData;
+ }
+
+ @Override
+ public void update() {
+ setHistogramStatistics(this);
+ }
+
+ @Override
+ public void update(long value) {
+ }
+
+ @Override
+ public long getCount() {
+ return histogramData == null ? 0 : histogramData.getCount();
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return new RocksDBNativeHistogram();
+ }
+
+ class RocksDBNativeHistogram extends HistogramStatistics {
+
+ @Override
+ public double getQuantile(double quantile) {
+ if (histogramData == null) {
+ return 0;
+ } else if (quantile == 0.5) {
+ return histogramData.getMedian();
+ } else if (quantile == 0.95) {
+ return histogramData.getPercentile95();
+ } else if (quantile == 0.99) {
+ return histogramData.getPercentile99();
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public long[] getValues() {
+ return new long[0];
+ }
+
+ @Override
+ public int size() {
+ return 0;
Review Comment:
Why not use `histogramData.getCount()`?
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java:
##########
@@ -302,6 +303,20 @@ public class RocksDBNativeMetricOptions implements
Serializable {
.withDescription(
"Monitor the duration of writer requiring to wait
for compaction or flush to finish in RocksDB.");
+ public static final ConfigOption<Boolean> MONITOR_DB_GET =
+ ConfigOptions.key("state.backend.rocksdb.metrics.db_get")
Review Comment:
Please follow the metrics options style in this class, use
`state.backend.rocksdb.metrics.db-get`.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java:
##########
@@ -231,4 +252,88 @@ public void update() {
setStatistics(this);
}
}
+
+ /**
+ * A gauge which periodically pulls a RocksDB statistics-based native
statistic metric for the database.
+ */
+ class RocksDBNativeHistogramStatisticsMetricView extends RocksDBNativeView
implements Histogram {
+ private final HistogramType histogramType;
+
+ private HistogramData histogramData;
+
+ private RocksDBNativeHistogramStatisticsMetricView(HistogramType
histogramType) {
+ this.histogramType = histogramType;
+ }
+
+ void setValue(HistogramData histogramData) {
+ this.histogramData = histogramData;
+ }
+
+ @Override
+ public void update() {
+ setHistogramStatistics(this);
+ }
+
+ @Override
+ public void update(long value) {
+ }
+
+ @Override
+ public long getCount() {
+ return histogramData == null ? 0 : histogramData.getCount();
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return new RocksDBNativeHistogram();
Review Comment:
The returned statistics should be a snapshot, however, current returned
`HistogramStatistics` would change during the time as the underlying
histogramData could also change.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java:
##########
@@ -231,4 +252,88 @@ public void update() {
setStatistics(this);
}
}
+
+ /**
+ * A gauge which periodically pulls a RocksDB statistics-based native
statistic metric for the database.
+ */
+ class RocksDBNativeHistogramStatisticsMetricView extends RocksDBNativeView
implements Histogram {
+ private final HistogramType histogramType;
+
+ private HistogramData histogramData;
+
+ private RocksDBNativeHistogramStatisticsMetricView(HistogramType
histogramType) {
+ this.histogramType = histogramType;
+ }
+
+ void setValue(HistogramData histogramData) {
+ this.histogramData = histogramData;
+ }
+
+ @Override
+ public void update() {
+ setHistogramStatistics(this);
+ }
+
+ @Override
+ public void update(long value) {
+ }
+
+ @Override
+ public long getCount() {
+ return histogramData == null ? 0 : histogramData.getCount();
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return new RocksDBNativeHistogram();
+ }
+
+ class RocksDBNativeHistogram extends HistogramStatistics {
+
+ @Override
+ public double getQuantile(double quantile) {
+ if (histogramData == null) {
+ return 0;
+ } else if (quantile == 0.5) {
+ return histogramData.getMedian();
+ } else if (quantile == 0.95) {
+ return histogramData.getPercentile95();
+ } else if (quantile == 0.99) {
+ return histogramData.getPercentile99();
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public long[] getValues() {
+ return new long[0];
Review Comment:
Return a series of zeros would not be good here.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java:
##########
@@ -302,6 +303,20 @@ public class RocksDBNativeMetricOptions implements
Serializable {
.withDescription(
"Monitor the duration of writer requiring to wait
for compaction or flush to finish in RocksDB.");
+ public static final ConfigOption<Boolean> MONITOR_DB_GET =
+ ConfigOptions.key("state.backend.rocksdb.metrics.db_get")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Monitor the metric that measures the time taken
for a RocksDB database to perform a read operation in microseconds");
+
+ public static final ConfigOption<Boolean> MONITOR_DB_WRITE =
+ ConfigOptions.key("state.backend.rocksdb.metrics.db_write")
Review Comment:
Why we only chose `db-get` and `db-write` here? MapState would also
introduce `db-seek` operations.
##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java:
##########
@@ -202,6 +220,17 @@ void testClosedGaugesDontRead() throws RocksDBException {
.withFailMessage("Closed gauge still queried RocksDB")
.isZero();
}
+
+ for
(RocksDBNativeMetricMonitor.RocksDBNativeHistogramStatisticsMetricView view :
+ registry.statisticsHistogramMetrics) {
+ view.close();
+ view.update();
+ Assert.assertEquals(
+ "Closed gauge still queried RocksDB",
Review Comment:
It's not a gauge here.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java:
##########
@@ -231,4 +252,88 @@ public void update() {
setStatistics(this);
}
}
+
+ /**
+ * A gauge which periodically pulls a RocksDB statistics-based native
statistic metric for the database.
+ */
+ class RocksDBNativeHistogramStatisticsMetricView extends RocksDBNativeView
implements Histogram {
+ private final HistogramType histogramType;
+
+ private HistogramData histogramData;
+
+ private RocksDBNativeHistogramStatisticsMetricView(HistogramType
histogramType) {
+ this.histogramType = histogramType;
+ }
+
+ void setValue(HistogramData histogramData) {
+ this.histogramData = histogramData;
+ }
+
+ @Override
+ public void update() {
+ setHistogramStatistics(this);
+ }
+
+ @Override
+ public void update(long value) {
+ }
+
+ @Override
+ public long getCount() {
+ return histogramData == null ? 0 : histogramData.getCount();
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return new RocksDBNativeHistogram();
+ }
+
+ class RocksDBNativeHistogram extends HistogramStatistics {
+
+ @Override
+ public double getQuantile(double quantile) {
Review Comment:
Flink would have `p50`, `p75`, `p95`, `p98`, `p99` and `p999` quantiles, I
don't think returning zero would be a good idea here.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java:
##########
@@ -231,4 +252,88 @@ public void update() {
setStatistics(this);
}
}
+
+ /**
+ * A gauge which periodically pulls a RocksDB statistics-based native
statistic metric for the database.
+ */
+ class RocksDBNativeHistogramStatisticsMetricView extends RocksDBNativeView
implements Histogram {
+ private final HistogramType histogramType;
+
+ private HistogramData histogramData;
+
+ private RocksDBNativeHistogramStatisticsMetricView(HistogramType
histogramType) {
+ this.histogramType = histogramType;
+ }
+
+ void setValue(HistogramData histogramData) {
+ this.histogramData = histogramData;
+ }
+
+ @Override
+ public void update() {
+ setHistogramStatistics(this);
+ }
+
+ @Override
+ public void update(long value) {
Review Comment:
We shall give descriptions that why this method is empty.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java:
##########
@@ -231,4 +252,88 @@ public void update() {
setStatistics(this);
}
}
+
+ /**
+ * A gauge which periodically pulls a RocksDB statistics-based native
statistic metric for the database.
+ */
+ class RocksDBNativeHistogramStatisticsMetricView extends RocksDBNativeView
implements Histogram {
+ private final HistogramType histogramType;
+
+ private HistogramData histogramData;
+
+ private RocksDBNativeHistogramStatisticsMetricView(HistogramType
histogramType) {
+ this.histogramType = histogramType;
+ }
+
+ void setValue(HistogramData histogramData) {
+ this.histogramData = histogramData;
+ }
+
+ @Override
+ public void update() {
+ setHistogramStatistics(this);
+ }
+
+ @Override
+ public void update(long value) {
+ }
+
+ @Override
+ public long getCount() {
+ return histogramData == null ? 0 : histogramData.getCount();
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return new RocksDBNativeHistogram();
+ }
+
+ class RocksDBNativeHistogram extends HistogramStatistics {
+
+ @Override
+ public double getQuantile(double quantile) {
+ if (histogramData == null) {
+ return 0;
+ } else if (quantile == 0.5) {
+ return histogramData.getMedian();
+ } else if (quantile == 0.95) {
+ return histogramData.getPercentile95();
+ } else if (quantile == 0.99) {
+ return histogramData.getPercentile99();
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public long[] getValues() {
+ return new long[0];
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public double getMean() {
+ return histogramData == null ? 0 : histogramData.getAverage();
Review Comment:
The `histogramData` could be a non-null value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]