This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 09a7f32c172 IGNITE-26182 Add metrics for log storage sizes (#7647)
09a7f32c172 is described below
commit 09a7f32c1720d00bd776e916b8fd51a14379da51
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Feb 24 16:51:58 2026 +0400
IGNITE-26182 Add metrics for log storage sizes (#7647)
---
.../ignite/internal/cli/CliIntegrationTest.java | 1 +
.../raftsnapshot/ItLogStorageMetricsTest.java | 88 +++++++++++
.../storage/impl/DefaultLogStorageManager.java | 11 ++
.../storage/impl/DefaultLogStorageManagerTest.java | 15 +-
.../rest/metrics/ItMetricControllerTest.java | 1 +
modules/runner/build.gradle | 2 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 13 ++
.../metrics/logstorage/LogStorageMetricSource.java | 96 ++++++++++++
.../metrics/logstorage/LogStorageMetrics.java | 157 +++++++++++++++++++
.../logstorage/LogStorageMetricSourceTest.java | 101 ++++++++++++
.../metrics/logstorage/LogStorageMetricsTest.java | 169 +++++++++++++++++++++
11 files changed, 642 insertions(+), 12 deletions(-)
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index 00eae74a96f..11601cfa5a2 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -87,6 +87,7 @@ public abstract class CliIntegrationTest extends
ClusterPerClassIntegrationTest
new MetricSource().name("index.builder").enabled(true),
new MetricSource().name("raft.snapshots").enabled(true),
new MetricSource().name("messaging").enabled(true),
+ new MetricSource().name("log.storage").enabled(true),
new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.default").enabled(true),
new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.deploymentunits").enabled(true),
new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.scalecube").enabled(true),
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItLogStorageMetricsTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItLogStorageMetricsTest.java
new file mode 100644
index 00000000000..8ff4a645a3e
--- /dev/null
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItLogStorageMetricsTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.Random;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ItLogStorageMetricsTest extends ClusterPerTestIntegrationTest {
+ private static final String TABLE_NAME = "TEST_TABLE";
+
+ private Ignite node;
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @BeforeEach
+ void prepare() {
+ node = cluster.node(0);
+ }
+
+ @Test
+ void totalLogStorageMetricIsUpdated() throws Exception {
+ LongGauge totalLogStorageSize = totalLogStorageSizeGauge();
+
+ int valueLength = 1_000_000;
+
+ feedLogStorageWithBlob(valueLength);
+
+ await().alias("Total log storage size should reach the expected value")
+ .until(totalLogStorageSize::value,
is(greaterThanOrEqualTo((long) valueLength)));
+ }
+
+ private LongGauge totalLogStorageSizeGauge() {
+ MetricSet logStorageMetrics = unwrapIgniteImpl(node)
+ .metricManager()
+ .metricSnapshot()
+ .metrics()
+ .get("log.storage");
+ assertThat(logStorageMetrics, is(notNullValue()));
+
+ LongGauge totalLogStorageSize =
logStorageMetrics.get("TotalLogStorageSize");
+ assertThat(totalLogStorageSize, is(notNullValue()));
+
+ return totalLogStorageSize;
+ }
+
+ private void feedLogStorageWithBlob(int valueLength) throws Exception {
+ node.sql().executeScript("CREATE TABLE " + TABLE_NAME + "(ID INT
PRIMARY KEY, VAL VARBINARY(" + valueLength + "))");
+
+ node.tables().table(TABLE_NAME)
+ .keyValueView(Integer.class, byte[].class)
+ .put(1, randomBytes(new Random(), valueLength));
+
+ DefaultLogStorageManager logStorageManager =
(DefaultLogStorageManager) unwrapIgniteImpl(node).partitionsLogStorageManager();
+ logStorageManager.flushSstFiles();
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManager.java
index 06afc56eddb..84123ef59f5 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManager.java
@@ -62,6 +62,7 @@ import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
+import org.rocksdb.FlushOptions;
import org.rocksdb.Priority;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
@@ -436,6 +437,16 @@ public class DefaultLogStorageManager implements
LogStorageManager {
return db;
}
+ /**
+ * Flushes all SST files to disk.
+ */
+ @TestOnly
+ public void flushSstFiles() throws Exception {
+ try (var flushOptions = new FlushOptions().setWaitForFlush(true)) {
+ db.flush(flushOptions, List.of(metaHandle, confHandle,
dataHandle));
+ }
+ }
+
@TestOnly
ColumnFamilyHandle metaColumnFamilyHandle() {
return metaHandle;
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManagerTest.java
index 18368c431d9..bee12132c89 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManagerTest.java
@@ -57,8 +57,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.RocksDBException;
@ExtendWith(WorkDirectoryExtension.class)
class DefaultLogStorageManagerTest {
@@ -76,7 +74,7 @@ class DefaultLogStorageManagerTest {
logStorageOptions.setConfigurationManager(new ConfigurationManager());
logStorageOptions.setLogEntryCodecFactory(DefaultLogEntryCodecFactory.getInstance());
- boolean disableFsync =
testInfo.getTestMethod().get().isAnnotationPresent(DisableFsync.class);
+ boolean disableFsync =
testInfo.getTestMethod().orElseThrow().isAnnotationPresent(DisableFsync.class);
logStorageManager = new DefaultLogStorageManager("test", "test",
workDir, !disableFsync);
startFactory();
@@ -308,7 +306,7 @@ class DefaultLogStorageManagerTest {
}
@Test
- void totalBytesOnDiskAccountsForWal() throws Exception {
+ void totalBytesOnDiskAccountsForWal() {
int entrySize = 1000;
long originalSize = logStorageManager.totalBytesOnDisk();
@@ -331,17 +329,10 @@ class DefaultLogStorageManagerTest {
logStorage.appendEntry(dataLogEntry(1, randomBytes(new Random(),
entrySize)));
// Make sure SST files are accounted for.
- flushSstFiles();
+ logStorageManager.flushSstFiles();
assertThat(logStorageManager.totalBytesOnDisk(),
is(greaterThanOrEqualTo(originalSize + entrySize)));
}
- private void flushSstFiles() throws RocksDBException {
- try (var flushOptions = new FlushOptions().setWaitForFlush(true)) {
- //noinspection resource
- logStorageManager.db().flush(flushOptions);
- }
- }
-
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
private @interface DisableFsync {
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index 6f2ee5385ac..3cde2837d0f 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -75,6 +75,7 @@ class ItMetricControllerTest extends
ClusterPerClassIntegrationTest {
new MetricSource("index.builder", true),
new MetricSource("raft.snapshots", true),
new MetricSource("messaging", true),
+ new MetricSource("log.storage", true),
new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.default", true),
new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.deploymentunits", true),
new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.scalecube", true),
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 0ee64387313..db94bccfa69 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -121,12 +121,14 @@ dependencies {
testImplementation testFixtures(project(':ignite-vault'))
testImplementation testFixtures(project(':ignite-metastorage'))
testImplementation testFixtures(project(':ignite-jdbc'))
+ testImplementation testFixtures(project(':ignite-metrics'))
testImplementation(libs.jsonpath.assert) {
//IDEA test runner doesn't apply Gradle dependency resolve strategy,
this is just not implemented
//So, exclude asm-core transitive dependency to protect of jar-hell.
exclude group: 'org.ow2.asm', module: 'asm'
}
testImplementation libs.auto.service.annotations
+ testImplementation libs.awaitility
integrationTestAnnotationProcessor
project(':ignite-configuration-annotation-processor')
integrationTestAnnotationProcessor libs.jmh.annotation.processor
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0727c4ab1bd..c91a98f304c 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -171,6 +171,7 @@ import
org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.MetricManagerImpl;
import
org.apache.ignite.internal.metrics.configuration.MetricExtensionConfiguration;
+import org.apache.ignite.internal.metrics.logstorage.LogStorageMetrics;
import org.apache.ignite.internal.metrics.messaging.MetricMessaging;
import org.apache.ignite.internal.metrics.sources.ClockServiceMetricSource;
import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
@@ -451,6 +452,8 @@ public class IgniteImpl implements Ignite {
/** Creator for volatile {@link LogStorageManager} instances. */
private final VolatileLogStorageManagerCreator
volatileLogStorageManagerCreator;
+ private final LogStorageMetrics logStorageMetrics;
+
private final SystemPropertiesComponent systemPropertiesComponent;
/** A hybrid logical clock. */
@@ -925,6 +928,15 @@ public class IgniteImpl implements Ignite {
volatileLogStorageManagerCreator = new
VolatileLogStorageManagerCreator(name,
workDir.resolve("volatile-log-spillout"));
+ logStorageMetrics = new LogStorageMetrics(
+ name,
+ metricManager,
+ cmgLogStorageManager,
+ msLogStorageManager,
+ partitionsLogStorageManager,
+ volatileLogStorageManagerCreator
+ );
+
schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
@@ -1630,6 +1642,7 @@ public class IgniteImpl implements Ignite {
distributionZoneManager,
computeComponent,
volatileLogStorageManagerCreator,
+ logStorageMetrics,
replicaMgr,
indexNodeFinishedRwTransactionsChecker,
txManager,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSource.java
b/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSource.java
new file mode 100644
index 00000000000..35e66bb3b23
--- /dev/null
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSource.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.logstorage;
+
+import java.util.List;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSource;
+import
org.apache.ignite.internal.metrics.logstorage.LogStorageMetricSource.Holder;
+
+/**
+ * {@link MetricSource} for log storage metrics.
+ */
+class LogStorageMetricSource extends AbstractMetricSource<Holder> {
+ static final String NAME = "log.storage";
+
+ private volatile long cmgLogStorageSizeBytes;
+ private volatile long metastorageLogStorageSizeBytes;
+ private volatile long partitionsLogStorageSizeBytes;
+
+ LogStorageMetricSource() {
+ super(NAME, "Log storage metrics.");
+ }
+
+ void cmgLogStorageSize(long newSize) {
+ this.cmgLogStorageSizeBytes = newSize;
+ }
+
+ void metastorageLogStorageSize(long newSize) {
+ this.metastorageLogStorageSizeBytes = newSize;
+ }
+
+ void partitionsLogStorageSize(long newSize) {
+ this.partitionsLogStorageSizeBytes = newSize;
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder();
+ }
+
+ protected class Holder implements AbstractMetricSource.Holder<Holder> {
+ private final LongMetric cmgLogStorageSize = new LongGauge(
+ "CmgLogStorageSize",
+ "Number of bytes occupied on disk by the CMG log.",
+ () -> cmgLogStorageSizeBytes
+ );
+
+ private final LongMetric metastorageLogStorageSize = new LongGauge(
+ "MetastorageLogStorageSize",
+ "Number of bytes occupied on disk by the Metastorage group
log.",
+ () -> metastorageLogStorageSizeBytes
+ );
+
+ private final LongMetric partitionsLogStorageSize = new LongGauge(
+ "PartitionsLogStorageSize",
+ "Number of bytes occupied on disk by the partitions groups
logs.",
+ () -> partitionsLogStorageSizeBytes
+ );
+
+ private final LongMetric totalLogStorageSize = new LongGauge(
+ "TotalLogStorageSize",
+ "Number of bytes occupied on disk by logs of all replication
groups.",
+ () -> cmgLogStorageSizeBytes + metastorageLogStorageSizeBytes
+ partitionsLogStorageSizeBytes
+ );
+
+ private final List<Metric> metrics = List.of(
+ cmgLogStorageSize,
+ metastorageLogStorageSize,
+ partitionsLogStorageSize,
+ totalLogStorageSize
+ );
+
+ @Override
+ public Iterable<Metric> metrics() {
+ return metrics;
+ }
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetrics.java
b/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetrics.java
new file mode 100644
index 00000000000..f5f073b26cf
--- /dev/null
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetrics.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.logstorage;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.raft.storage.LogStorageManager;
+import
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageManagerCreator;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
+
+/**
+ * Component that collects metrics about log storages.
+ */
+public class LogStorageMetrics implements IgniteComponent {
+ private static final IgniteLogger LOG =
Loggers.forClass(LogStorageMetrics.class);
+
+ private static final long DEFAULT_UPDATE_PERIOD_MS = 1000;
+
+ private final String nodeName;
+
+ private final MetricManager metricManager;
+
+ private final LogStorageManager cmgLogStorageManager;
+ private final LogStorageManager metastorageLogStorageManager;
+ private final LogStorageManager partitionsLogStorageManager;
+ private final VolatileLogStorageManagerCreator
volatileLogStorageManagerCreator;
+
+ private final long updatePeriodMs;
+
+ private final LogStorageMetricSource metricSource = new
LogStorageMetricSource();
+
+ private volatile ScheduledExecutorService executorService;
+ private volatile ScheduledFuture<?> taskFuture;
+
+ /** Constructor. */
+ public LogStorageMetrics(
+ String nodeName,
+ MetricManager metricManager,
+ LogStorageManager cmgLogStorageManager,
+ LogStorageManager metastorageLogStorageManager,
+ LogStorageManager partitionsLogStorageManager,
+ VolatileLogStorageManagerCreator volatileLogStorageManagerCreator
+ ) {
+ this(
+ nodeName,
+ metricManager,
+ cmgLogStorageManager,
+ metastorageLogStorageManager,
+ partitionsLogStorageManager,
+ volatileLogStorageManagerCreator,
+ DEFAULT_UPDATE_PERIOD_MS
+ );
+ }
+
+ /** Constructor. */
+ LogStorageMetrics(
+ String nodeName,
+ MetricManager metricManager,
+ LogStorageManager cmgLogStorageManager,
+ LogStorageManager metastorageLogStorageManager,
+ LogStorageManager partitionsLogStorageManager,
+ VolatileLogStorageManagerCreator volatileLogStorageManagerCreator,
+ long updatePeriodMs
+ ) {
+ this.nodeName = nodeName;
+ this.metricManager = metricManager;
+ this.cmgLogStorageManager = cmgLogStorageManager;
+ this.metastorageLogStorageManager = metastorageLogStorageManager;
+ this.partitionsLogStorageManager = partitionsLogStorageManager;
+ this.volatileLogStorageManagerCreator =
volatileLogStorageManagerCreator;
+ this.updatePeriodMs = updatePeriodMs;
+ }
+
+ @Override
+ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
+ metricManager.registerSource(metricSource);
+ metricManager.enable(metricSource);
+
+ executorService = Executors.newSingleThreadScheduledExecutor(
+ IgniteThreadFactory.create(nodeName,
"log-storage-metrics-collector", LOG)
+ );
+ taskFuture = executorService.scheduleAtFixedRate(this::updateMetrics,
0, updatePeriodMs, MILLISECONDS);
+
+ return nullCompletedFuture();
+ }
+
+ private void updateMetrics() {
+ try {
+
metricSource.cmgLogStorageSize(cmgLogStorageManager.totalBytesOnDisk());
+
metricSource.metastorageLogStorageSize(metastorageLogStorageManager.totalBytesOnDisk());
+ metricSource.partitionsLogStorageSize(
+ partitionsLogStorageManager.totalBytesOnDisk() +
volatileLogStorageManagerCreator.totalBytesOnDisk()
+ );
+ } catch (Exception | AssertionError e) {
+ if (!hasCause(e, NodeStoppingException.class)) {
+ LOG.warn("Failed to update log storage metrics", e);
+ }
+ } catch (Error e) {
+ LOG.error("Failed to update log storage metrics, no more updates
will happen", e);
+
+ throw e;
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
+ ScheduledFuture<?> futureToCancel = taskFuture;
+ if (futureToCancel != null) {
+ futureToCancel.cancel(true);
+ }
+
+ ScheduledExecutorService executorToShutdown = executorService;
+ if (executorToShutdown != null) {
+ executorToShutdown.shutdownNow();
+
+ // If we fail to finish waiting, our task might end up hitting a
stopped RocksDB instance that will crash the node.
+ try {
+ executorToShutdown.awaitTermination(Long.MAX_VALUE, SECONDS);
+ } catch (InterruptedException e) {
+ // Ok, we are interrupted, cannot wait any longer.
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ metricManager.unregisterSource(metricSource);
+
+ return nullCompletedFuture();
+ }
+}
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSourceTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSourceTest.java
new file mode 100644
index 00000000000..e2b70a553c3
--- /dev/null
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSourceTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.logstorage;
+
+import static java.util.stream.Collectors.toUnmodifiableSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.Set;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class LogStorageMetricSourceTest {
+ private final LogStorageMetricSource metricSource = new
LogStorageMetricSource();
+
+ private MetricSet metricSet;
+
+ @BeforeEach
+ void setUp() {
+ MetricSet ms = metricSource.enable();
+ assertThat(ms, is(notNullValue()));
+ metricSet = ms;
+ }
+
+ @Test
+ void metricSetIsAsExpected() {
+ Set<String> metricNames =
StreamSupport.stream(metricSet.spliterator(), false)
+ .map(Metric::name)
+ .collect(toUnmodifiableSet());
+
+ assertThat(
+ metricNames,
+ is(Set.of("CmgLogStorageSize", "MetastorageLogStorageSize",
"PartitionsLogStorageSize", "TotalLogStorageSize"))
+ );
+ }
+
+ @Test
+ void metricsAreInitializedToZero() {
+ assertThatLongGaugeHasValue("CmgLogStorageSize", 0);
+ assertThatLongGaugeHasValue("MetastorageLogStorageSize", 0);
+ assertThatLongGaugeHasValue("PartitionsLogStorageSize", 0);
+ assertThatLongGaugeHasValue("TotalLogStorageSize", 0);
+ }
+
+ private void assertThatLongGaugeHasValue(String metricName, long
expectedValue) {
+ LongGauge gauge = metricSet.get(metricName);
+
+ assertThat(gauge, is(notNullValue()));
+ assertThat(gauge.value(), is(expectedValue));
+ }
+
+ @Test
+ void cmgLogStorageSizeIsUpdated() {
+ metricSource.cmgLogStorageSize(100);
+
+ assertThatLongGaugeHasValue("CmgLogStorageSize", 100);
+ }
+
+ @Test
+ void metastorageLogStorageSizeIsUpdated() {
+ metricSource.metastorageLogStorageSize(200);
+
+ assertThatLongGaugeHasValue("MetastorageLogStorageSize", 200);
+ }
+
+ @Test
+ void partitionsLogStorageSizeIsUpdated() {
+ metricSource.partitionsLogStorageSize(300);
+
+ assertThatLongGaugeHasValue("PartitionsLogStorageSize", 300);
+ }
+
+ @Test
+ void totalLogStorageSizeIsUpdated() {
+ metricSource.cmgLogStorageSize(1);
+ metricSource.metastorageLogStorageSize(10);
+ metricSource.partitionsLogStorageSize(100);
+
+ assertThatLongGaugeHasValue("TotalLogStorageSize", 111);
+ }
+}
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricsTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricsTest.java
new file mode 100644
index 00000000000..67bc7cd5896
--- /dev/null
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricsTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.logstorage;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.raft.storage.LogStorageManager;
+import
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageManagerCreator;
+import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.DefaultLogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(WorkDirectoryExtension.class)
+class LogStorageMetricsTest {
+ @WorkDirectory
+ private Path workDir;
+
+ private LogStorageManager cmgLogStorageManager;
+ private LogStorageManager metastorageLogStorageManager;
+ private LogStorageManager partitionsLogStorageManager;
+
+ private VolatileLogStorageManagerCreator volatileLogStorageManagerCreator;
+
+ private final TestMetricManager metricManager = new TestMetricManager();
+
+ private LogStorageMetrics logStorageMetrics;
+
+ private final RaftOptions raftOptions = new RaftOptions();
+
+ private final LogStorageOptions logStorageOptions = new
LogStorageOptions();
+
+ @BeforeEach
+ void setUp() {
+ logStorageOptions.setConfigurationManager(new ConfigurationManager());
+
logStorageOptions.setLogEntryCodecFactory(DefaultLogEntryCodecFactory.getInstance());
+
+ String nodeName = "test";
+
+ cmgLogStorageManager = SharedLogStorageManagerUtils.create(nodeName,
workDir.resolve("cmg"));
+ metastorageLogStorageManager =
SharedLogStorageManagerUtils.create(nodeName, workDir.resolve("metastorage"));
+ partitionsLogStorageManager =
SharedLogStorageManagerUtils.create(nodeName, workDir.resolve("partitions"));
+
+ volatileLogStorageManagerCreator = new
VolatileLogStorageManagerCreator(nodeName, workDir.resolve("spillout"));
+
+ logStorageMetrics = new LogStorageMetrics(
+ nodeName,
+ metricManager,
+ cmgLogStorageManager,
+ metastorageLogStorageManager,
+ partitionsLogStorageManager,
+ volatileLogStorageManagerCreator,
+ 10
+ );
+
+ CompletableFuture<Void> startFuture = startAsync(
+ new ComponentContext(),
+ cmgLogStorageManager,
+ metastorageLogStorageManager,
+ partitionsLogStorageManager,
+ volatileLogStorageManagerCreator,
+ logStorageMetrics
+ );
+ assertThat(startFuture, willCompleteSuccessfully());
+ }
+
+ @AfterEach
+ void cleanup() {
+ CompletableFuture<Void> stopFuture = stopAsync(
+ new ComponentContext(),
+ logStorageMetrics,
+ volatileLogStorageManagerCreator,
+ partitionsLogStorageManager,
+ metastorageLogStorageManager,
+ cmgLogStorageManager
+ );
+ assertThat(stopFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void cmgLogStorageSizeIsAccountedFor() {
+ testLogStorageSizeIsAccountedFor(cmgLogStorageManager, "cmg",
"CmgLogStorageSize");
+ }
+
+ private void testLogStorageSizeIsAccountedFor(LogStorageManager
logStorageManager, String groupUri, String metricName) {
+ LogStorage logStorage = logStorageManager.createLogStorage(groupUri,
raftOptions);
+ logStorage.init(logStorageOptions);
+
+ logStorage.appendEntry(dataLogEntry(1, randomBytes(new Random(),
1000)));
+
+ waitForLongGaugeValue(metricName, is(greaterThanOrEqualTo(1000L)));
+ waitForLongGaugeValue("TotalLogStorageSize",
is(greaterThanOrEqualTo(1000L)));
+ }
+
+ private static LogEntry dataLogEntry(int index, byte[] content) {
+ LogEntry logEntry = new LogEntry();
+
+ logEntry.setId(new LogId(index, 1));
+ logEntry.setType(EntryType.ENTRY_TYPE_DATA);
+ logEntry.setData(ByteBuffer.wrap(content));
+
+ return logEntry;
+ }
+
+ private void waitForLongGaugeValue(String metricName, Matcher<Long>
valueMatcher) {
+ Metric metric = metricManager.metric(LogStorageMetricSource.NAME,
metricName);
+ assertThat(metric, isA(LongGauge.class));
+ LongGauge gauge = (LongGauge) metric;
+
+ assertThat(gauge, is(notNullValue()));
+
+ await().until(gauge::value, valueMatcher);
+ }
+
+ @Test
+ void metastorageLogStorageSizeIsAccountedFor() {
+ testLogStorageSizeIsAccountedFor(metastorageLogStorageManager,
"metastorage", "MetastorageLogStorageSize");
+ }
+
+ @Test
+ void partitionsLogStorageSizeIsAccountedFor() {
+ testLogStorageSizeIsAccountedFor(partitionsLogStorageManager, new
ZonePartitionId(1, 0).toString(), "PartitionsLogStorageSize");
+ }
+}