This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 696b49676ff IGNITE-22912 Add metrics related to Meta Storage 
availability (#7845)
696b49676ff is described below

commit 696b49676ffaa7e742663d3e32e12387dcafaa0b
Author: Mirza Aliev <[email protected]>
AuthorDate: Fri Mar 27 17:57:20 2026 +0400

    IGNITE-22912 Add metrics related to Meta Storage availability (#7845)
---
 .../ignite/internal/cli/CliIntegrationTest.java    |  1 +
 .../metastorage/impl/MetaStorageManagerImpl.java   | 78 ++++++++++++++++++++-
 .../metrics/MetaStorageMetricSource.java           | 32 ++++++++-
 .../metrics/MetaStorageMetricSourceTest.java       | 79 ++++++++++++++++++++++
 .../rest/metrics/ItMetricControllerTest.java       |  1 +
 .../org/apache/ignite/internal/app/IgniteImpl.java | 11 ++-
 .../distributed/schema/SchemaSyncMetricSource.java | 78 +++++++++++++++++++++
 .../distributed/schema/SchemaSyncServiceImpl.java  | 30 +++++++-
 .../schema/SchemaSyncMetricSourceTest.java         | 78 +++++++++++++++++++++
 .../schema/SchemaSyncServiceImplTest.java          | 58 ++++++++++++++++
 10 files changed, 440 insertions(+), 6 deletions(-)

diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index b2ab7350714..07121038d86 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -66,6 +66,7 @@ public abstract class CliIntegrationTest extends 
ClusterPerClassIntegrationTest
             new MetricSource().name("os").enabled(true),
             new MetricSource().name("raft").enabled(true),
             new MetricSource().name("metastorage").enabled(true),
+            new MetricSource().name("schema.sync").enabled(true),
             new MetricSource().name("client.handler").enabled(true),
             new MetricSource().name("sql.client").enabled(true),
             new MetricSource().name("sql.plan.cache").enabled(true),
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index eee80e1e468..bea68877d4e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -38,10 +38,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -49,6 +52,7 @@ import java.util.function.Supplier;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.ClusterState;
 import org.apache.ignite.internal.cluster.management.MetaStorageInfo;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
 import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
@@ -108,6 +112,7 @@ import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -214,6 +219,24 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
     /** Tracks only reads from the leader, local reads are tracked by the 
storage itself. */
     private final ReadOperationForCompactionTracker 
readOperationFromLeaderForCompactionTracker;
 
+    /** Current Meta Storage voting peers (consistent IDs), updated on each 
committed Raft configuration. */
+    private volatile Set<String> currentVotingPeers = Set.of();
+
+    /**
+     * MetaStorage availability flag: {@code true} if Meta Storage majority 
can execute commands, {@code false} otherwise.
+     * Updated by the periodic availability check.
+     */
+    private volatile boolean msAvailable = false;
+
+    /** Periodic executor that checks Meta Storage availability. */
+    private @Nullable ScheduledExecutorService availabilityCheckExecutor;
+
+    /** Interval between availability checks, in milliseconds. */
+    private static final long AVAILABILITY_CHECK_PERIOD_MS = 5_000L;
+
+    /** Timeout for a single availability check command, in milliseconds. */
+    private static final long AVAILABILITY_CHECK_TIMEOUT_MS = 5_000L;
+
     private final MetastorageDivergencyValidator divergencyValidator = new 
MetastorageDivergencyValidator();
 
     private final RecoveryRevisionsListenerImpl recoveryRevisionsListener;
@@ -256,7 +279,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         this.storage = storage;
         this.clock = clock;
         this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), 
busyLock, clock, failureProcessor);
-        this.metaStorageMetricSource = new 
MetaStorageMetricSource(clusterTime);
+        this.metaStorageMetricSource = new 
MetaStorageMetricSource(clusterTime, this::computeAvailablePeers, () -> 
msAvailable ? 1 : 0);
         this.topologyAwareRaftGroupServiceFactory = 
topologyAwareRaftGroupServiceFactory;
         this.metricManager = metricManager;
         this.metastorageRepairStorage = metastorageRepairStorage;
@@ -618,6 +641,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
     private void onConfigurationCommitted(RaftGroupConfiguration 
configuration) {
         LOG.info("MS configuration committed {}", configuration);
 
+        currentVotingPeers = Set.copyOf(configuration.peers());
+
         // TODO: IGNITE-23210 - use thenAccept() when implemented.
         raftServiceFuture
                 .handle((raftService, ex) -> {
@@ -772,6 +797,16 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         metricManager.registerSource(metaStorageMetricSource);
         metricManager.enable(metaStorageMetricSource);
 
+        availabilityCheckExecutor = Executors.newSingleThreadScheduledExecutor(
+                IgniteThreadFactory.create(clusterService.nodeName(), 
"metastorage-availability-check", LOG)
+        );
+        availabilityCheckExecutor.scheduleWithFixedDelay(
+                this::checkMgAvailability,
+                AVAILABILITY_CHECK_PERIOD_MS,
+                AVAILABILITY_CHECK_PERIOD_MS,
+                TimeUnit.MILLISECONDS
+        );
+
         return nullCompletedFuture();
     }
 
@@ -809,6 +844,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
         try {
             IgniteUtils.closeAllManually(
+                    () -> 
IgniteUtils.shutdownAndAwaitTermination(availabilityCheckExecutor, 10, 
TimeUnit.SECONDS),
                     () -> 
metricManager.unregisterSource(metaStorageMetricSource),
                     clusterTime,
                     () -> failOrConsume(metaStorageSvcFut, new 
NodeStoppingException(), MetaStorageServiceImpl::close),
@@ -1101,6 +1137,46 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         }
     }
 
+    /**
+     * Returns the number of MetaStorage voting peers that are present in the 
current logical topology.
+     */
+    private int computeAvailablePeers() {
+        Set<String> peers = currentVotingPeers;
+
+        if (peers.isEmpty()) {
+            return 0;
+        }
+
+        int count = 0;
+
+        for (LogicalNode node : 
logicalTopologyService.localLogicalTopology().nodes()) {
+            if (peers.contains(node.name())) {
+                count++;
+            }
+        }
+
+        return count;
+    }
+
+    /**
+     * Performs a periodic check of Meta Storage availability by attempting to 
execute a command.
+     * Updates {@link #msAvailable} based on whether the attempt succeeds 
within the timeout.
+     */
+    private void checkMgAvailability() {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            metaStorageSvcFut
+                    .thenCompose(MetaStorageServiceImpl::currentRevisions)
+                    .orTimeout(AVAILABILITY_CHECK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)
+                    .whenComplete((rev, ex) -> msAvailable = ex == null);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     private void onSafeTimeAdvanced(HybridTimestamp time) {
         assert time != null;
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSource.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSource.java
index 93570716593..59249c26290 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSource.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSource.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.metastorage.metrics;
 
 import java.util.List;
+import java.util.function.IntSupplier;
 import 
org.apache.ignite.internal.metastorage.metrics.MetaStorageMetricSource.Holder;
 import org.apache.ignite.internal.metrics.AbstractMetricSource;
 import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.IntGauge;
 import org.apache.ignite.internal.metrics.LongGauge;
 import org.apache.ignite.internal.metrics.LongMetric;
 import org.apache.ignite.internal.metrics.Metric;
@@ -33,13 +35,23 @@ public class MetaStorageMetricSource extends 
AbstractMetricSource<Holder> {
 
     private final MetaStorageMetrics metaStorageMetrics;
 
+    private final IntSupplier availablePeersSupplier;
+
+    private final IntSupplier availableSupplier;
+
     /**
      * Constructor.
      */
-    public MetaStorageMetricSource(MetaStorageMetrics metaStorageMetrics) {
+    public MetaStorageMetricSource(
+            MetaStorageMetrics metaStorageMetrics,
+            IntSupplier availablePeersSupplier,
+            IntSupplier availableSupplier
+    ) {
         super(SOURCE_NAME);
 
         this.metaStorageMetrics = metaStorageMetrics;
+        this.availablePeersSupplier = availablePeersSupplier;
+        this.availableSupplier = availableSupplier;
     }
 
     @Override
@@ -63,7 +75,7 @@ public class MetaStorageMetricSource extends 
AbstractMetricSource<Holder> {
     protected class Holder implements AbstractMetricSource.Holder<Holder> {
         private final LongMetric safeTimeLag = new LongGauge(
                 "SafeTimeLag",
-                "Number of milliseconds the local MetaStorage SafeTime lags 
behind the local logical clock.",
+                "Number of milliseconds the local Meta Storage SafeTime lags 
behind the local logical clock.",
                 metaStorageMetrics::safeTimeLag
         );
 
@@ -72,9 +84,23 @@ public class MetaStorageMetricSource extends 
AbstractMetricSource<Holder> {
                 "The current size of the cache of idempotent commands' 
results."
         );
 
+        private final IntGauge availablePeers = new IntGauge(
+                "AvailablePeers",
+                "Number of available members of the Meta Storage voting set 
based on the current logical topology.",
+                availablePeersSupplier
+        );
+
+        private final IntGauge majorityAvailable = new IntGauge(
+                "MajorityAvailable",
+                "1 if the Meta Storage majority is available (can execute 
commands), 0 otherwise.",
+                availableSupplier
+        );
+
         private final List<Metric> metrics = List.of(
                 safeTimeLag,
-                idempotentCacheSize
+                idempotentCacheSize,
+                availablePeers,
+                majorityAvailable
         );
 
         @Override
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSourceTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSourceTest.java
new file mode 100644
index 00000000000..ff584dc1ec8
--- /dev/null
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSourceTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.metrics;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.apache.ignite.internal.metrics.IntMetric;
+import org.apache.ignite.internal.metrics.MetricRegistry;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link MetaStorageMetricSource}. */
+class MetaStorageMetricSourceTest extends BaseIgniteAbstractTest {
+    @Test
+    void availablePeersMetricReadsFromSupplier() {
+        int[] value = {3};
+
+        MetaStorageMetricSource source = new MetaStorageMetricSource(() -> 0L, 
() -> value[0], () -> 0);
+        MetricSet metricSet = enableSource(source);
+
+        IntMetric availablePeers = metricSet.get("AvailablePeers");
+        assertNotNull(availablePeers);
+        assertEquals(3, availablePeers.value());
+
+        value[0] = 1;
+        assertEquals(1, availablePeers.value());
+    }
+
+    @Test
+    void availableMetricReadsFromSupplier() {
+        int[] value = {0};
+
+        MetaStorageMetricSource source = new MetaStorageMetricSource(() -> 0L, 
() -> 0, () -> value[0]);
+        MetricSet metricSet = enableSource(source);
+
+        IntMetric available = metricSet.get("MajorityAvailable");
+        assertNotNull(available);
+        assertEquals(0, available.value());
+
+        value[0] = 1;
+        assertEquals(1, available.value());
+    }
+
+    @Test
+    void idempotentCacheSizeIsUpdatedViaCallback() {
+        MetaStorageMetricSource source = new MetaStorageMetricSource(() -> 0L, 
() -> 0, () -> 0);
+        MetricSet metricSet = enableSource(source);
+
+        IntMetric cacheSize = metricSet.get("IdempotentCacheSize");
+        assertNotNull(cacheSize);
+        assertEquals(0, cacheSize.value());
+
+        source.onIdempotentCacheSizeChange(42);
+        assertEquals(42, cacheSize.value());
+    }
+
+    private static MetricSet enableSource(MetaStorageMetricSource source) {
+        MetricRegistry registry = new MetricRegistry();
+        registry.registerSource(source);
+        return registry.enable(source);
+    }
+}
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index 3ead07d5d90..e9789aabd6b 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -54,6 +54,7 @@ class ItMetricControllerTest extends 
ClusterPerClassIntegrationTest {
             new MetricSource("os", true),
             new MetricSource("raft", true),
             new MetricSource("metastorage", true),
+            new MetricSource("schema.sync", true),
             new MetricSource("client.handler", true),
             new MetricSource("sql.client", true),
             new MetricSource("sql.plan.cache", true),
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 5926b94d810..3471002eb7f 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -277,6 +277,7 @@ import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeColl
 import 
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
 import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest;
 import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
+import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncMetricSource;
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
 import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
@@ -944,7 +945,15 @@ public class IgniteImpl implements Ignite {
         schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(), 
metaStorageMgr.watchExecutor());
         
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
 
-        SchemaSyncService schemaSyncService = new 
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
+        SchemaSyncMetricSource schemaSyncMetricSource = new 
SchemaSyncMetricSource();
+        metricManager.registerSource(schemaSyncMetricSource);
+        metricManager.enable(schemaSyncMetricSource);
+
+        SchemaSyncService schemaSyncService = new SchemaSyncServiceImpl(
+                schemaSafeTimeTracker,
+                delayDurationMsSupplier,
+                schemaSyncMetricSource::recordWait
+        );
 
         schemaManager = new SchemaManager(registry, catalogManager);
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSource.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSource.java
new file mode 100644
index 00000000000..044943c2edd
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSource.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import java.util.List;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.Metric;
+
+/**
+ * Metric source for schema synchronization metrics.
+ */
+public class SchemaSyncMetricSource extends 
AbstractMetricSource<SchemaSyncMetricSource.Holder> {
+    private static final String SOURCE_NAME = "schema.sync";
+
+    /**
+     * Constructor.
+     */
+    public SchemaSyncMetricSource() {
+        super(SOURCE_NAME);
+    }
+
+    /**
+     * Histogram bounds (in milliseconds) for schema sync wait time 
distribution.
+     * Buckets: [0..1], [1..5], [5..10], [10..50], [50..100], [100..500], 
[500..1000], [1000..5000], [5000..inf].
+     */
+    private static final long[] WAIT_BOUNDS_MS = {1, 5, 10, 50, 100, 500, 
1000, 5000};
+
+    /**
+     * Records a completed schema sync wait with the given duration.
+     *
+     * @param durationMs Duration of the wait in milliseconds.
+     */
+    public void recordWait(long durationMs) {
+        Holder holder = holder();
+
+        if (holder != null) {
+            holder.waits.add(durationMs);
+        }
+    }
+
+    @Override
+    protected Holder createHolder() {
+        return new Holder();
+    }
+
+    /** Holder. */
+    protected static class Holder implements 
AbstractMetricSource.Holder<Holder> {
+        private final DistributionMetric waits = new DistributionMetric(
+                "Waits",
+                "Histogram of schema synchronization wait times in 
milliseconds."
+                        + " High values may indicate Meta Storage 
unavailability or slowness.",
+                WAIT_BOUNDS_MS
+        );
+
+        private final List<Metric> metrics = List.of(waits);
+
+        @Override
+        public Iterable<Metric> metrics() {
+            return metrics;
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
index 0a1b3e6a752..5bb1ce0cccc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.table.distributed.schema;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongConsumer;
 import java.util.function.LongSupplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.SchemaSafeTimeTracker;
@@ -31,17 +33,43 @@ public class SchemaSyncServiceImpl implements 
SchemaSyncService {
 
     private final LongSupplier delayDurationMs;
 
+    private final LongConsumer waitDurationMsRecorder;
+
     /**
      * Constructor.
      */
     public SchemaSyncServiceImpl(SchemaSafeTimeTracker schemaSafeTimeTracker, 
LongSupplier delayDurationMs) {
+        this(schemaSafeTimeTracker, delayDurationMs, durationMs -> {});
+    }
+
+    /**
+     * Constructor with metrics recording.
+     *
+     * @param schemaSafeTimeTracker Schema safe time tracker.
+     * @param delayDurationMs Supplier of the delay duration in milliseconds.
+     * @param waitDurationMsRecorder Consumer that receives the duration (in 
ms) of each completed wait.
+     */
+    public SchemaSyncServiceImpl(
+            SchemaSafeTimeTracker schemaSafeTimeTracker,
+            LongSupplier delayDurationMs,
+            LongConsumer waitDurationMsRecorder
+    ) {
         this.schemaSafeTimeTracker = schemaSafeTimeTracker;
         this.delayDurationMs = delayDurationMs;
+        this.waitDurationMsRecorder = waitDurationMsRecorder;
     }
 
     @Override
     public CompletableFuture<Void> waitForMetadataCompleteness(HybridTimestamp 
ts) {
-        return schemaSafeTimeTracker.waitFor(metastoreSafeTimeToWait(ts));
+        CompletableFuture<Void> future = 
schemaSafeTimeTracker.waitFor(metastoreSafeTimeToWait(ts));
+
+        if (future.isDone()) {
+            return future;
+        }
+
+        long startNs = System.nanoTime();
+
+        return future.whenComplete((v, ex) -> 
waitDurationMsRecorder.accept(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNs)));
     }
 
     private HybridTimestamp metastoreSafeTimeToWait(HybridTimestamp ts) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSourceTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSourceTest.java
new file mode 100644
index 00000000000..72954835b11
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSourceTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.MetricRegistry;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link SchemaSyncMetricSource}. */
+class SchemaSyncMetricSourceTest extends BaseIgniteAbstractTest {
+    private final SchemaSyncMetricSource source = new SchemaSyncMetricSource();
+    private final MetricRegistry registry = new MetricRegistry();
+    private DistributionMetric waits;
+
+    @BeforeEach
+    void setUp() {
+        registry.registerSource(source);
+        MetricSet metricSet = registry.enable(source);
+
+        waits = metricSet.get("Waits");
+        assertNotNull(waits);
+    }
+
+    @Test
+    void allBucketsAreZeroInitially() {
+        for (long count : waits.value()) {
+            assertEquals(0L, count);
+        }
+    }
+
+    @Test
+    void recordWaitPopulatesCorrectBucket() {
+        source.recordWait(7);   // falls in bucket 2: (5..10]
+        source.recordWait(200); // falls in bucket 5: (100..500]
+        source.recordWait(3000); // falls in bucket 7: (1000..5000]
+
+        assertEquals(1L, waits.value()[2]);
+        assertEquals(1L, waits.value()[5]);
+        assertEquals(1L, waits.value()[7]);
+    }
+
+    @Test
+    void recordWaitAccumulatesCountsInSameBucket() {
+        source.recordWait(1); // bucket 0: [0..1]
+        source.recordWait(0); // bucket 0: [0..1]
+        source.recordWait(1); // bucket 0: [0..1]
+
+        assertEquals(3L, waits.value()[0]);
+    }
+
+    @Test
+    void recordWaitIsNoOpWhenSourceIsDisabled() {
+        SchemaSyncMetricSource disabledSource = new SchemaSyncMetricSource();
+        // not registered or enabled - should not throw
+        disabledSource.recordWait(100);
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
index f6535d02ad3..acba3a96839 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
@@ -19,11 +19,17 @@ package org.apache.ignite.internal.table.distributed.schema;
 
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.LongSupplier;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -70,4 +76,56 @@ class SchemaSyncServiceImplTest extends 
BaseIgniteAbstractTest {
         safeTimeFuture.complete(null);
         assertThat(waitFuture, willCompleteSuccessfully());
     }
+
+    @Test
+    void waitRecorderIsCalledWithDurationOnCompletion() {
+        List<Long> recorded = new ArrayList<>();
+        schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, 
delayDurationMs, recorded::add);
+
+        HybridTimestamp ts = clock.now();
+        var safeTimeFuture = new CompletableFuture<Void>();
+
+        
when(schemaSafeTimeTracker.waitFor(ts.subtractPhysicalTime(delayDurationMs.getAsLong()))).thenReturn(safeTimeFuture);
+
+        schemaSyncService.waitForMetadataCompleteness(ts);
+
+        assertThat(recorded, empty());
+
+        safeTimeFuture.complete(null);
+
+        assertThat(recorded, hasSize(1));
+        assertThat(recorded.get(0), greaterThanOrEqualTo(0L));
+    }
+
+    @Test
+    void waitRecorderIsCalledEvenWhenFutureCompletesExceptionally() {
+        List<Long> recorded = new ArrayList<>();
+        schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, 
delayDurationMs, recorded::add);
+
+        HybridTimestamp ts = clock.now();
+        var safeTimeFuture = new CompletableFuture<Void>();
+
+        
when(schemaSafeTimeTracker.waitFor(ts.subtractPhysicalTime(delayDurationMs.getAsLong()))).thenReturn(safeTimeFuture);
+
+        schemaSyncService.waitForMetadataCompleteness(ts);
+
+        safeTimeFuture.completeExceptionally(new RuntimeException("test 
error"));
+
+        assertThat(recorded, hasSize(1));
+        assertThat(recorded.get(0), greaterThanOrEqualTo(0L));
+    }
+
+    @Test
+    void waitRecorderIsNotCalledWhenFutureIsAlreadyCompleted() {
+        List<Long> recorded = new ArrayList<>();
+        schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, 
delayDurationMs, recorded::add);
+
+        HybridTimestamp ts = clock.now();
+
+        
when(schemaSafeTimeTracker.waitFor(ts.subtractPhysicalTime(delayDurationMs.getAsLong()))).thenReturn(nullCompletedFuture());
+
+        schemaSyncService.waitForMetadataCompleteness(ts);
+
+        assertThat(recorded, empty());
+    }
 }

Reply via email to