This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 990903fb204 IGNITE-17005 Added a snapshot create operation metrics 
(#10036)
990903fb204 is described below

commit 990903fb204bc2e9bb138a6ad3a83e0fa2a1373f
Author: Nikita Amelchev <[email protected]>
AuthorDate: Tue May 31 19:33:02 2022 +0300

    IGNITE-17005 Added a snapshot create operation metrics (#10036)
---
 .../snapshot/IgniteSnapshotManager.java            |  27 ++++++
 .../persistence/snapshot/SnapshotFutureTask.java   |  27 +++++-
 ....java => IgniteClusterSnapshotMetricsTest.java} | 107 +++++++++++++++++++--
 .../IgniteSnapshotWithIndexingTestSuite.java       |   4 +-
 4 files changed, 156 insertions(+), 9 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index ed560bb58c3..ae9d339da6f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -509,6 +509,18 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
             "The list of names of all snapshots currently saved on the local 
node with respect to " +
                 "the configured via IgniteConfiguration snapshot working 
path.");
 
+        mreg.register("CurrentSnapshotTotalSize", () -> {
+            SnapshotFutureTask task = currentSnapshotTask();
+
+            return task == null ? -1 : task.totalSize();
+        }, "Estimated size of current cluster snapshot in bytes on this node. 
The value may grow during snapshot creation.");
+
+        mreg.register("CurrentSnapshotProcessedSize", () -> {
+            SnapshotFutureTask task = currentSnapshotTask();
+
+            return task == null ? -1 : task.processedSize();
+        }, "Processed size of current cluster snapshot in bytes on this 
node.");
+
         restoreCacheGrpProc.registerMetrics();
 
         cctx.exchange().registerExchangeAwareComponent(this);
@@ -1825,6 +1837,21 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         }
     }
 
+    /** @return Current snapshot task. */
+    private SnapshotFutureTask currentSnapshotTask() {
+        SnapshotOperationRequest req = clusterSnpReq;
+
+        if (req == null)
+            return null;
+
+        AbstractSnapshotFutureTask<?> task = 
locSnpTasks.get(req.snapshotName());
+
+        if (!(task instanceof SnapshotFutureTask))
+            return null;
+
+        return (SnapshotFutureTask)task;
+    }
+
     /**
      * @param factory Factory which produces {@link LocalSnapshotSender} 
implementation.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 81d5f52644d..bdc26c09265 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -141,6 +142,12 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
     /** Flag indicates that task already scheduled on checkpoint. */
     private final AtomicBoolean started = new AtomicBoolean();
 
+    /** Estimated snapshot size in bytes. The value may grow during snapshot 
creation. */
+    private final AtomicLong totalSize = new AtomicLong();
+
+    /** Processed snapshot size in bytes. */
+    private final AtomicLong processedSize = new AtomicLong();
+
     /**
      * @param cctx Shared context.
      * @param srcNodeId Node id which cause snapshot task creation.
@@ -484,6 +491,8 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
 
                     Long partLen = partFileLengths.get(pair);
 
+                    totalSize.addAndGet(partLen);
+
                     CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
                         wrapExceptionIfStarted(() -> {
                             snpSndr.sendPart(
@@ -494,6 +503,8 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
 
                             // Stop partition writer.
                             
partDeltaWriters.get(pair).markPartitionProcessed();
+
+                            processedSize.addAndGet(partLen);
                         }),
                         snpSndr.executor())
                         // Wait for the completion of both futures - 
checkpoint end, copy partition.
@@ -512,6 +523,8 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
 
                                 snpSndr.sendDelta(delta, cacheDirName, pair);
 
+                                processedSize.addAndGet(delta.length());
+
                                 boolean deleted = delta.delete();
 
                                 assert deleted;
@@ -596,6 +609,16 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
         return closeFut;
     }
 
+    /** @return Estimated snapshot size in bytes. The value may grow during 
snapshot creation. */
+    public long totalSize() {
+        return totalSize.get();
+    }
+
+    /** @return Processed snapshot size in bytes. */
+    public long processedSize() {
+        return processedSize.get();
+    }
+
     /** {@inheritDoc} */
     @Override public boolean cancel() {
         super.cancel();
@@ -930,7 +953,9 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<Set<GroupPartitionId
             }
 
             // Write buffer to the end of the file.
-            deltaFileIo.writeFully(pageBuf);
+            int len = deltaFileIo.writeFully(pageBuf);
+
+            totalSize.addAndGet(len);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreMetricsTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotMetricsTest.java
similarity index 69%
rename from 
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreMetricsTest.java
rename to 
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotMetricsTest.java
index b29c5a97893..b2bad4e8b3b 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreMetricsTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotMetricsTest.java
@@ -19,29 +19,41 @@ package 
org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
 import java.io.FilenameFilter;
+import java.io.Serializable;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 import javax.management.AttributeNotFoundException;
 import javax.management.DynamicMBean;
 import javax.management.MBeanException;
 import javax.management.ReflectionException;
+import org.apache.commons.io.FileUtils;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.After;
@@ -50,12 +62,14 @@ import org.junit.Test;
 
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_TRANSFER_RATE_DMS_KEY;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.SNAPSHOT_RESTORE_METRICS;
 
 /**
- * Tests snapshot restore metrics.
+ * Tests snapshot create/restore metrics.
  */
-public class IgniteClusterSnapshotRestoreMetricsTest extends 
IgniteClusterSnapshotRestoreBaseTest {
+public class IgniteClusterSnapshotMetricsTest extends 
IgniteClusterSnapshotRestoreBaseTest {
     /** Separate working directory prefix. */
     private static final String DEDICATED_DIR_PREFIX = "dedicated-";
 
@@ -99,7 +113,7 @@ public class IgniteClusterSnapshotRestoreMetricsTest extends 
IgniteClusterSnapsh
     @Override public void beforeTestSnapshot() throws Exception {
         super.beforeTestSnapshot();
 
-        cleanuo();
+        cleanup();
     }
 
     /** {@inheritDoc} */
@@ -107,13 +121,13 @@ public class IgniteClusterSnapshotRestoreMetricsTest 
extends IgniteClusterSnapsh
     @Override public void afterTestSnapshot() throws Exception {
         super.afterTestSnapshot();
 
-        cleanuo();
+        cleanup();
     }
 
     /** @throws Exception If fails. */
     @Test
     public void testRestoreSnapshotProgress() throws Exception {
-        // Caches with differebt partition distribution.
+        // Caches with different partition distribution.
         CacheConfiguration<Integer, Object> ccfg1 = 
cacheConfig("cache1").setBackups(0);
         CacheConfiguration<Integer, Object> ccfg2 = 
cacheConfig("cache2").setCacheMode(CacheMode.REPLICATED);
 
@@ -212,6 +226,87 @@ public class IgniteClusterSnapshotRestoreMetricsTest 
extends IgniteClusterSnapsh
         }
     }
 
+    /** @throws Exception If fails. */
+    @Test
+    public void testCreateSnapshotProgress() throws Exception {
+        CacheConfiguration<Integer, Object> ccfg1 = cacheConfig("cache1");
+
+        IgniteEx ignite = startGridsWithCache(DEDICATED_CNT, CACHE_KEYS_RANGE, 
key -> new Account(key, key), ccfg1);
+
+        MetricRegistry mreg = 
ignite.context().metric().registry(SNAPSHOT_METRICS);
+
+        LongMetric totalSize = mreg.findMetric("CurrentSnapshotTotalSize");
+        LongMetric processedSize = 
mreg.findMetric("CurrentSnapshotProcessedSize");
+
+        assertEquals(-1, totalSize.value());
+        assertEquals(-1, processedSize.value());
+
+        // Calculate transfer rate limit.
+        PdsFolderSettings<?> folderSettings = 
ignite.context().pdsFolderResolver().resolveFolders();
+        File storeWorkDir = new File(folderSettings.persistentStoreRootPath(), 
folderSettings.folderName());
+
+        long rate = FileUtils.sizeOfDirectory(storeWorkDir) / 5;
+
+        // Limit snapshot transfer rate.
+        DistributedChangeableProperty<Serializable> rateProp =
+            
ignite.context().distributedConfiguration().property(SNAPSHOT_TRANSFER_RATE_DMS_KEY);
+
+        rateProp.propagate(rate);
+
+        // Start cluster snapshot.
+        IgniteFuture<Void> fut = 
ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        // Run load.
+        IgniteInternalFuture<?> loadFut = GridTestUtils.runAsync(() -> {
+            IgniteCache<Integer, Object> cache = 
ignite.getOrCreateCache(ccfg1);
+
+            while (!fut.isDone()) {
+                Integer key = 
ThreadLocalRandom.current().nextInt(CACHE_KEYS_RANGE);
+                Account val = new 
Account(ThreadLocalRandom.current().nextInt(), 
ThreadLocalRandom.current().nextInt());
+
+                cache.put(key, val);
+            }
+        });
+
+        List<Long> totalVals = new ArrayList<>();
+        List<Long> processedVals = new ArrayList<>();
+
+        // Store metrics values during cluster snapshot.
+        while (!fut.isDone()) {
+            long total = totalSize.value();
+            long processed = processedSize.value();
+
+            if (total != -1 && processed != -1) {
+                totalVals.add(totalSize.value());
+                processedVals.add(processedSize.value());
+            }
+
+            U.sleep(500);
+        }
+
+        fut.get(getTestTimeout());
+
+        loadFut.get();
+
+        assertTrue("Expected distinct values: " + totalVals,
+            totalVals.stream().mapToLong(v -> v).distinct().count() > 1);
+        assertTrue("Expected distinct values: " + processedVals,
+            processedVals.stream().mapToLong(v -> v).distinct().count() > 1);
+
+        assertTrue("Expected sorted values: " + totalVals,
+            F.isSorted(totalVals.stream().mapToLong(v -> v).toArray()));
+        assertTrue("Expected sorted values: " + processedVals,
+            F.isSorted(processedVals.stream().mapToLong(v -> v).toArray()));
+
+        for (int i = 0; i < totalVals.size(); i++) {
+            assertTrue("Total size less than processed [total=" + totalVals + 
", processed=" + processedVals + ']',
+                processedVals.get(i) <= totalVals.get(i));
+        }
+
+        assertEquals(-1, totalSize.value());
+        assertEquals(-1, processedSize.value());
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -245,7 +340,7 @@ public class IgniteClusterSnapshotRestoreMetricsTest 
extends IgniteClusterSnapsh
     /**
      * @throws Exception If failed.
      */
-    private void cleanuo() throws Exception {
+    private void cleanup() throws Exception {
         FilenameFilter filter = (file, name) -> file.isDirectory() && 
name.startsWith(DEDICATED_DIR_PREFIX);
 
         for (File file : new File(U.defaultWorkDirectory()).listFiles(filter))
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java
index 40d66543522..be36d972ca1 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckWithIndexesTest;
-import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreMetricsTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotMetricsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreWithIndexingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest;
 import org.junit.runner.RunWith;
@@ -30,7 +30,7 @@ import org.junit.runners.Suite;
     IgniteClusterSnapshotWithIndexesTest.class,
     IgniteClusterSnapshotCheckWithIndexesTest.class,
     IgniteClusterSnapshotRestoreWithIndexingTest.class,
-    IgniteClusterSnapshotRestoreMetricsTest.class
+    IgniteClusterSnapshotMetricsTest.class
 })
 public class IgniteSnapshotWithIndexingTestSuite {
 }

Reply via email to