This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 990903fb204 IGNITE-17005 Added a snapshot create operation metrics
(#10036)
990903fb204 is described below
commit 990903fb204bc2e9bb138a6ad3a83e0fa2a1373f
Author: Nikita Amelchev <[email protected]>
AuthorDate: Tue May 31 19:33:02 2022 +0300
IGNITE-17005 Added a snapshot create operation metrics (#10036)
---
.../snapshot/IgniteSnapshotManager.java | 27 ++++++
.../persistence/snapshot/SnapshotFutureTask.java | 27 +++++-
....java => IgniteClusterSnapshotMetricsTest.java} | 107 +++++++++++++++++++--
.../IgniteSnapshotWithIndexingTestSuite.java | 4 +-
4 files changed, 156 insertions(+), 9 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index ed560bb58c3..ae9d339da6f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -509,6 +509,18 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
"The list of names of all snapshots currently saved on the local
node with respect to " +
"the configured via IgniteConfiguration snapshot working
path.");
+ mreg.register("CurrentSnapshotTotalSize", () -> {
+ SnapshotFutureTask task = currentSnapshotTask();
+
+ return task == null ? -1 : task.totalSize();
+ }, "Estimated size of current cluster snapshot in bytes on this node.
The value may grow during snapshot creation.");
+
+ mreg.register("CurrentSnapshotProcessedSize", () -> {
+ SnapshotFutureTask task = currentSnapshotTask();
+
+ return task == null ? -1 : task.processedSize();
+ }, "Processed size of current cluster snapshot in bytes on this
node.");
+
restoreCacheGrpProc.registerMetrics();
cctx.exchange().registerExchangeAwareComponent(this);
@@ -1825,6 +1837,21 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
}
+ /** @return Current snapshot task. */
+ private SnapshotFutureTask currentSnapshotTask() {
+ SnapshotOperationRequest req = clusterSnpReq;
+
+ if (req == null)
+ return null;
+
+ AbstractSnapshotFutureTask<?> task =
locSnpTasks.get(req.snapshotName());
+
+ if (!(task instanceof SnapshotFutureTask))
+ return null;
+
+ return (SnapshotFutureTask)task;
+ }
+
/**
* @param factory Factory which produces {@link LocalSnapshotSender}
implementation.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 81d5f52644d..bdc26c09265 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
@@ -141,6 +142,12 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
/** Flag indicates that task already scheduled on checkpoint. */
private final AtomicBoolean started = new AtomicBoolean();
+ /** Estimated snapshot size in bytes. The value may grow during snapshot
creation. */
+ private final AtomicLong totalSize = new AtomicLong();
+
+ /** Processed snapshot size in bytes. */
+ private final AtomicLong processedSize = new AtomicLong();
+
/**
* @param cctx Shared context.
* @param srcNodeId Node id which cause snapshot task creation.
@@ -484,6 +491,8 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
Long partLen = partFileLengths.get(pair);
+ totalSize.addAndGet(partLen);
+
CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
wrapExceptionIfStarted(() -> {
snpSndr.sendPart(
@@ -494,6 +503,8 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
// Stop partition writer.
partDeltaWriters.get(pair).markPartitionProcessed();
+
+ processedSize.addAndGet(partLen);
}),
snpSndr.executor())
// Wait for the completion of both futures -
checkpoint end, copy partition.
@@ -512,6 +523,8 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
snpSndr.sendDelta(delta, cacheDirName, pair);
+ processedSize.addAndGet(delta.length());
+
boolean deleted = delta.delete();
assert deleted;
@@ -596,6 +609,16 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
return closeFut;
}
+ /** @return Estimated snapshot size in bytes. The value may grow during
snapshot creation. */
+ public long totalSize() {
+ return totalSize.get();
+ }
+
+ /** @return Processed snapshot size in bytes. */
+ public long processedSize() {
+ return processedSize.get();
+ }
+
/** {@inheritDoc} */
@Override public boolean cancel() {
super.cancel();
@@ -930,7 +953,9 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<Set<GroupPartitionId
}
// Write buffer to the end of the file.
- deltaFileIo.writeFully(pageBuf);
+ int len = deltaFileIo.writeFully(pageBuf);
+
+ totalSize.addAndGet(len);
}
/** {@inheritDoc} */
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreMetricsTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotMetricsTest.java
similarity index 69%
rename from
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreMetricsTest.java
rename to
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotMetricsTest.java
index b29c5a97893..b2bad4e8b3b 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreMetricsTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotMetricsTest.java
@@ -19,29 +19,41 @@ package
org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.io.FilenameFilter;
+import java.io.Serializable;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import javax.management.AttributeNotFoundException;
import javax.management.DynamicMBean;
import javax.management.MBeanException;
import javax.management.ReflectionException;
+import org.apache.commons.io.FileUtils;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.After;
@@ -50,12 +62,14 @@ import org.junit.Test;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_TRANSFER_RATE_DMS_KEY;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.SNAPSHOT_RESTORE_METRICS;
/**
- * Tests snapshot restore metrics.
+ * Tests snapshot create/restore metrics.
*/
-public class IgniteClusterSnapshotRestoreMetricsTest extends
IgniteClusterSnapshotRestoreBaseTest {
+public class IgniteClusterSnapshotMetricsTest extends
IgniteClusterSnapshotRestoreBaseTest {
/** Separate working directory prefix. */
private static final String DEDICATED_DIR_PREFIX = "dedicated-";
@@ -99,7 +113,7 @@ public class IgniteClusterSnapshotRestoreMetricsTest extends
IgniteClusterSnapsh
@Override public void beforeTestSnapshot() throws Exception {
super.beforeTestSnapshot();
- cleanuo();
+ cleanup();
}
/** {@inheritDoc} */
@@ -107,13 +121,13 @@ public class IgniteClusterSnapshotRestoreMetricsTest
extends IgniteClusterSnapsh
@Override public void afterTestSnapshot() throws Exception {
super.afterTestSnapshot();
- cleanuo();
+ cleanup();
}
/** @throws Exception If fails. */
@Test
public void testRestoreSnapshotProgress() throws Exception {
- // Caches with differebt partition distribution.
+ // Caches with different partition distribution.
CacheConfiguration<Integer, Object> ccfg1 =
cacheConfig("cache1").setBackups(0);
CacheConfiguration<Integer, Object> ccfg2 =
cacheConfig("cache2").setCacheMode(CacheMode.REPLICATED);
@@ -212,6 +226,87 @@ public class IgniteClusterSnapshotRestoreMetricsTest
extends IgniteClusterSnapsh
}
}
+ /** @throws Exception If fails. */
+ @Test
+ public void testCreateSnapshotProgress() throws Exception {
+ CacheConfiguration<Integer, Object> ccfg1 = cacheConfig("cache1");
+
+ IgniteEx ignite = startGridsWithCache(DEDICATED_CNT, CACHE_KEYS_RANGE,
key -> new Account(key, key), ccfg1);
+
+ MetricRegistry mreg =
ignite.context().metric().registry(SNAPSHOT_METRICS);
+
+ LongMetric totalSize = mreg.findMetric("CurrentSnapshotTotalSize");
+ LongMetric processedSize =
mreg.findMetric("CurrentSnapshotProcessedSize");
+
+ assertEquals(-1, totalSize.value());
+ assertEquals(-1, processedSize.value());
+
+ // Calculate transfer rate limit.
+ PdsFolderSettings<?> folderSettings =
ignite.context().pdsFolderResolver().resolveFolders();
+ File storeWorkDir = new File(folderSettings.persistentStoreRootPath(),
folderSettings.folderName());
+
+ long rate = FileUtils.sizeOfDirectory(storeWorkDir) / 5;
+
+ // Limit snapshot transfer rate.
+ DistributedChangeableProperty<Serializable> rateProp =
+
ignite.context().distributedConfiguration().property(SNAPSHOT_TRANSFER_RATE_DMS_KEY);
+
+ rateProp.propagate(rate);
+
+ // Start cluster snapshot.
+ IgniteFuture<Void> fut =
ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ // Run load.
+ IgniteInternalFuture<?> loadFut = GridTestUtils.runAsync(() -> {
+ IgniteCache<Integer, Object> cache =
ignite.getOrCreateCache(ccfg1);
+
+ while (!fut.isDone()) {
+ Integer key =
ThreadLocalRandom.current().nextInt(CACHE_KEYS_RANGE);
+ Account val = new
Account(ThreadLocalRandom.current().nextInt(),
ThreadLocalRandom.current().nextInt());
+
+ cache.put(key, val);
+ }
+ });
+
+ List<Long> totalVals = new ArrayList<>();
+ List<Long> processedVals = new ArrayList<>();
+
+ // Store metrics values during cluster snapshot.
+ while (!fut.isDone()) {
+ long total = totalSize.value();
+ long processed = processedSize.value();
+
+ if (total != -1 && processed != -1) {
+ totalVals.add(totalSize.value());
+ processedVals.add(processedSize.value());
+ }
+
+ U.sleep(500);
+ }
+
+ fut.get(getTestTimeout());
+
+ loadFut.get();
+
+ assertTrue("Expected distinct values: " + totalVals,
+ totalVals.stream().mapToLong(v -> v).distinct().count() > 1);
+ assertTrue("Expected distinct values: " + processedVals,
+ processedVals.stream().mapToLong(v -> v).distinct().count() > 1);
+
+ assertTrue("Expected sorted values: " + totalVals,
+ F.isSorted(totalVals.stream().mapToLong(v -> v).toArray()));
+ assertTrue("Expected sorted values: " + processedVals,
+ F.isSorted(processedVals.stream().mapToLong(v -> v).toArray()));
+
+ for (int i = 0; i < totalVals.size(); i++) {
+ assertTrue("Total size less than processed [total=" + totalVals +
", processed=" + processedVals + ']',
+ processedVals.get(i) <= totalVals.get(i));
+ }
+
+ assertEquals(-1, totalSize.value());
+ assertEquals(-1, processedSize.value());
+ }
+
/**
* @throws Exception If failed.
*/
@@ -245,7 +340,7 @@ public class IgniteClusterSnapshotRestoreMetricsTest
extends IgniteClusterSnapsh
/**
* @throws Exception If failed.
*/
- private void cleanuo() throws Exception {
+ private void cleanup() throws Exception {
FilenameFilter filter = (file, name) -> file.isDirectory() &&
name.startsWith(DEDICATED_DIR_PREFIX);
for (File file : new File(U.defaultWorkDirectory()).listFiles(filter))
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java
index 40d66543522..be36d972ca1 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java
@@ -18,7 +18,7 @@
package org.apache.ignite.testsuites;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckWithIndexesTest;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreMetricsTest;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotMetricsTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreWithIndexingTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest;
import org.junit.runner.RunWith;
@@ -30,7 +30,7 @@ import org.junit.runners.Suite;
IgniteClusterSnapshotWithIndexesTest.class,
IgniteClusterSnapshotCheckWithIndexesTest.class,
IgniteClusterSnapshotRestoreWithIndexingTest.class,
- IgniteClusterSnapshotRestoreMetricsTest.class
+ IgniteClusterSnapshotMetricsTest.class
})
public class IgniteSnapshotWithIndexingTestSuite {
}