This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a0d074d02b3 IGNITE-20627 Added number of partitions processed by the
index worker to the output of the index commands. (#10993)
a0d074d02b3 is described below
commit a0d074d02b3549f811b794aa3e9ec92645c84972
Author: Mikhail Petrov <[email protected]>
AuthorDate: Thu Oct 26 17:11:32 2023 +0300
IGNITE-20627 Added number of partitions processed by the index worker to
the output of the index commands. (#10993)
---
.../GridCommandHandlerIndexForceRebuildTest.java | 38 ++++---
.../GridCommandHandlerIndexRebuildStatusTest.java | 125 ++++++++++++++++++---
.../java/org/apache/ignite/cache/CacheMetrics.java | 6 +
.../management/cache/IndexForceRebuildTask.java | 4 +-
.../cache/IndexRebuildStatusInfoContainer.java | 42 ++++++-
.../management/cache/IndexRebuildStatusTask.java | 9 +-
.../processors/cache/CacheGroupMetricsImpl.java | 24 +---
.../processors/cache/CacheMetricsImpl.java | 27 +++++
.../processors/cache/CacheMetricsSnapshot.java | 5 +
.../processors/cache/CacheMetricsSnapshotV2.java | 13 +++
.../processors/metric/impl/IntMetricImpl.java | 5 +
.../schema/SchemaIndexCachePartitionWorker.java | 4 +-
.../query/schema/SchemaIndexCacheVisitorImpl.java | 2 +-
.../platform/PlatformCacheWriteMetricsTask.java | 5 +
14 files changed, 246 insertions(+), 63 deletions(-)
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
index ee269d7c4e7..ce9f0658de5 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -43,6 +44,7 @@ import
org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import
org.apache.ignite.internal.processors.query.schema.IndexRebuildCancelToken;
import
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFuture;
import
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
@@ -729,20 +731,24 @@ public class GridCommandHandlerIndexForceRebuildTest
extends GridCommandHandlerA
/**
* Makes formatted text for given caches.
*
+ * @param header Output header.
* @param cacheGroputToNames Cache groups mapping to non-existing cache
names.
- * @return Text for CLI print output for given caches.
+ * @return CLI output pattern for given caches.
*/
- private static String makeStringListForCacheGroupsAndNames(Map<String,
List<String>> cacheGroputToNames) {
- SB sb = new SB();
+ private static Pattern makePatternForCacheGroupsAndNames(String header,
Map<String, List<String>> cacheGroputToNames) {
+ GridStringBuilder sb = new SB(header).a("\\n");
for (Map.Entry<String, List<String>> entry :
cacheGroputToNames.entrySet()) {
String cacheGrp = entry.getKey();
for (String cacheName : entry.getValue())
- sb.a(INDENT).a("groupName=").a(cacheGrp).a(",
cacheName=").a(cacheName).a(U.nl());
+ sb.a(INDENT)
+ .a("groupName=").a(cacheGrp)
+ .a(", cacheName=").a(cacheName)
+ .a(", indexBuildPartitionsLeftCount=(\\d+),
totalPartitionsCount=(\\d+), progress=(\\d+)%\\n");
}
- return sb.toString();
+ return Pattern.compile(sb.toString());
}
/**
@@ -752,13 +758,12 @@ public class GridCommandHandlerIndexForceRebuildTest
extends GridCommandHandlerA
* @param cacheGroputToNames Cache groups mapping to non-existing cache
names.
*/
private static void validateOutputIndicesRebuildingInProgress(String
outputStr, Map<String, List<String>> cacheGroputToNames) {
- String caches =
makeStringListForCacheGroupsAndNames(cacheGroputToNames);
-
- assertContains(
- log,
- outputStr,
- "WARNING: These caches have indexes rebuilding in progress:" +
U.nl() + caches
+ Pattern pattern = makePatternForCacheGroupsAndNames(
+ "WARNING: These caches have indexes rebuilding in progress:",
+ cacheGroputToNames
);
+
+ assertTrue(pattern.matcher(outputStr).find());
}
/**
@@ -768,13 +773,12 @@ public class GridCommandHandlerIndexForceRebuildTest
extends GridCommandHandlerA
* @param cacheGroputToNames Cache groups mapping to non-existing cache
names.
*/
private void validateOutputIndicesRebuildWasStarted(String outputStr,
Map<String, List<String>> cacheGroputToNames) {
- String caches =
makeStringListForCacheGroupsAndNames(cacheGroputToNames);
-
- assertContains(
- log,
- outputStr,
- "Indexes rebuild was started for these caches:" + U.nl() + caches
+ Pattern pattern = makePatternForCacheGroupsAndNames(
+ "Indexes rebuild was started for these caches:",
+ cacheGroputToNames
);
+
+ assertTrue(pattern.matcher(outputStr).find());
}
/**
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
index 9ed302c6bc7..80913e9f3eb 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
@@ -17,14 +17,21 @@
package org.apache.ignite.util;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -39,9 +46,11 @@ import
org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillCache;
import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillThreeFieldsEntryCache;
import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.simpleIndexEntity;
@@ -90,6 +99,7 @@ public class GridCommandHandlerIndexRebuildStatusTest extends
GridCommandHandler
idxRebuildsStartedNum.set(0);
statusRequestingFinished.set(false);
+ BlockingSchemaIndexCacheVisitorClosure.rowIndexListener = null;
}
/** {@inheritDoc} */
@@ -143,22 +153,45 @@ public class GridCommandHandlerIndexRebuildStatusTest
extends GridCommandHandler
deleteIndexBin(getTestIgniteInstanceName(GRIDS_NUM - 2));
IndexProcessor.idxRebuildCls = BlockingIndexesRebuildTask.class;
- IgniteEx ignite1 = startGrid(GRIDS_NUM - 1);
+ startGrid(GRIDS_NUM - 1);
IndexProcessor.idxRebuildCls = BlockingIndexesRebuildTask.class;
- IgniteEx ignite2 = startGrid(GRIDS_NUM - 2);
+ startGrid(GRIDS_NUM - 2);
- final UUID id1 = ignite1.localNode().id();
- final UUID id2 = ignite2.localNode().id();
+ grid(0).cache("cache1").enableStatistics(true);
boolean allRebuildsStarted = GridTestUtils.waitForCondition(() ->
idxRebuildsStartedNum.get() == 6, 30_000);
assertTrue("Failed to wait for all indexes to start being rebuilt",
allRebuildsStarted);
assertEquals(EXIT_CODE_OK, execute(handler, "--cache",
"indexes_rebuild_status"));
+ checkResult(handler, 1, 2);
+
statusRequestingFinished.set(true);
- checkResult(handler, id1, id2);
+ CountDownLatch idxProgressBlockedLatch = new CountDownLatch(1);
+ CountDownLatch idxProgressUnblockedLatch = new CountDownLatch(1);
+
+ BlockingSchemaIndexCacheVisitorClosure.rowIndexListener = () -> {
+ if (isIndexRebuildInProgress("cache1")) {
+ try {
+ idxProgressBlockedLatch.countDown();
+
+ idxProgressUnblockedLatch.await(getTestTimeout(),
MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ };
+
+ assertTrue(idxProgressBlockedLatch.await(getTestTimeout(),
MILLISECONDS));
+
+ assertEquals(EXIT_CODE_OK, execute(handler, "--cache",
"indexes_rebuild_status"));
+
+ checkRebuildInProgressOutputFor("cache1");
+
+ idxProgressUnblockedLatch.countDown();
}
/**
@@ -192,7 +225,7 @@ public class GridCommandHandlerIndexRebuildStatusTest
extends GridCommandHandler
statusRequestingFinished.set(true);
- checkResult(handler, id1);
+ checkResult(handler, 2);
}
/**
@@ -214,26 +247,82 @@ public class GridCommandHandlerIndexRebuildStatusTest
extends GridCommandHandler
* in {@code handler} last operation result and in {@code testOut}.
*
* @param handler CommandHandler used to run command.
- * @param nodeIds Ids to check.
+ * @param nodeIdxs Indexes of node to check.
*/
- private void checkResult(TestCommandHandler handler, UUID... nodeIds) {
+ private void checkResult(TestCommandHandler handler, int... nodeIdxs) {
String output = testOut.toString();
Map<UUID, Set<IndexRebuildStatusInfoContainer>> cmdResult =
handler.getLastOperationResult();
+
assertNotNull(cmdResult);
- assertEquals("Unexpected number of nodes in result", nodeIds.length,
cmdResult.size());
+ assertEquals("Unexpected number of nodes in result", nodeIdxs.length,
cmdResult.size());
+
+ for (int nodeIdx : nodeIdxs) {
+ Set<IndexRebuildStatusInfoContainer> cacheInfos =
cmdResult.get(grid(nodeIdx).localNode().id());
- for (UUID nodeId: nodeIds) {
- Set<IndexRebuildStatusInfoContainer> cacheInfos =
cmdResult.get(nodeId);
assertNotNull(cacheInfos);
assertEquals("Unexpected number of cacheInfos in result", 3,
cacheInfos.size());
- final String nodeStr = "node_id=" + nodeId + ", groupName=group1,
cacheName=cache2\n" +
- "node_id=" + nodeId + ", groupName=group2, cacheName=cache1\n"
+
- "node_id=" + nodeId + ", groupName=no_group,
cacheName=cache_no_group";
+ checkRebuildStartOutput(output, nodeIdx, "group1", "cache2");
+ checkRebuildStartOutput(output, nodeIdx, "group2", "cache1");
+ checkRebuildStartOutput(output, nodeIdx, "no_group",
"cache_no_group");
+ }
+ }
+
+ /** */
+ private void checkRebuildStartOutput(String output, int nodeIdx, String
grpName, String cacheName) {
+ IgniteEx ignite = grid(nodeIdx);
+
+ int locPartsCount =
ignite.context().cache().cache(cacheName).context().topology().localPartitions().size();
+
+ assertContains(
+ log,
+ output,
+ "node_id=" + ignite.localNode().id() +
+ ", groupName=" + grpName +
+ ", cacheName=" + cacheName +
+ ", indexBuildPartitionsLeftCount=" + locPartsCount +
+ ", totalPartitionsCount=" + locPartsCount +
+ ", progress=0%");
+ }
+
+ /** */
+ private void checkRebuildInProgressOutputFor(String cacheName) throws
Exception {
+ Matcher matcher = Pattern.compile(
+ "cacheName=" + cacheName + ",
indexBuildPartitionsLeftCount=(\\d+), totalPartitionsCount=(\\d+),
progress=(\\d+)%"
+ ).matcher(testOut.toString());
+
+ List<Integer> rebuildProgressStatuses = new ArrayList<>();
+ List<Integer> indexBuildPartitionsLeftCounts = new ArrayList<>();
+
+ while (matcher.find()) {
+
indexBuildPartitionsLeftCounts.add(Integer.parseInt(matcher.group(1)));
+
+ rebuildProgressStatuses.add(Integer.parseInt(matcher.group(3)));
+ }
+
+ assertTrue(rebuildProgressStatuses.stream().anyMatch(progress ->
progress > 0));
+
+ int cacheTotalRebuildingPartsCnt =
indexBuildPartitionsLeftCounts.stream().mapToInt(Integer::intValue).sum();
- assertContains(log, output, nodeStr);
+ assertTrue(waitForCondition(
+ () ->
grid(0).cache(cacheName).metrics().getIndexBuildPartitionsLeftCount() ==
cacheTotalRebuildingPartsCnt,
+ getTestTimeout())
+ );
+ }
+
+ /** */
+ private boolean isIndexRebuildInProgress(String cacheName) {
+ for (Ignite ignite : Ignition.allGrids()) {
+ GridCacheContext<Object, Object> cctx =
((IgniteEx)ignite).context().cache().cache(cacheName).context();
+
+ long parts =
cctx.cache().metrics0().getIndexBuildPartitionsLeftCount();
+
+ if (parts > 0 && parts < cctx.topology().partitions() / 2)
+ return true;
}
+
+ return false;
}
/**
@@ -256,6 +345,9 @@ public class GridCommandHandlerIndexRebuildStatusTest
extends GridCommandHandler
/** */
private SchemaIndexCacheVisitorClosure original;
+ /** */
+ public static Runnable rowIndexListener;
+
/**
* @param original Original.
*/
@@ -274,6 +366,9 @@ public class GridCommandHandlerIndexRebuildStatusTest
extends GridCommandHandler
statusRequestingFinished.set(true);
}
+ if (rowIndexListener != null)
+ rowIndexListener.run();
+
original.apply(row);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 15e8ac9e282..5ede7786373 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -735,4 +735,10 @@ public interface CacheMetrics {
* @return Number of keys processed during index rebuilding.
*/
public long getIndexRebuildKeysProcessed();
+
+ /**
+ * @return The number of partitions that remain to be processed to
complete indexing.
+ * Note that this metric includes backup partitions, which also
participate in index building on each node.
+ */
+ public int getIndexBuildPartitionsLeftCount();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexForceRebuildTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexForceRebuildTask.java
index 74ce6c787d5..85479d93eeb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexForceRebuildTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexForceRebuildTask.java
@@ -101,12 +101,12 @@ public class IndexForceRebuildTask extends
VisorMultiNodeTask<CacheIndexesForceR
Set<IndexRebuildStatusInfoContainer>
cachesWithRebuildingInProgress =
cachesCtxWithRebuildingInProgress.stream()
- .map(c -> new IndexRebuildStatusInfoContainer(c.config()))
+ .map(IndexRebuildStatusInfoContainer::new)
.collect(Collectors.toSet());
Set<IndexRebuildStatusInfoContainer> cachesWithStartedRebuild =
cachesToRebuild.stream()
- .map(c -> new IndexRebuildStatusInfoContainer(c.config()))
+ .map(IndexRebuildStatusInfoContainer::new)
.filter(c -> !cachesWithRebuildingInProgress.contains(c))
.collect(Collectors.toSet());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusInfoContainer.java
b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusInfoContainer.java
index 057ba7cdb1f..a2754c7e5ab 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusInfoContainer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusInfoContainer.java
@@ -23,6 +23,7 @@ import java.io.ObjectOutput;
import java.util.Comparator;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -42,6 +43,12 @@ public class IndexRebuildStatusInfoContainer extends
IgniteDataTransferObject {
/** Cache name. */
private String cacheName;
+ /** */
+ private int indexBuildPartitionsLeftCount;
+
+ /** Local partitions count. */
+ private int totalPartitionsCount;
+
/**
* Empty constructor required for Serializable.
*/
@@ -50,23 +57,31 @@ public class IndexRebuildStatusInfoContainer extends
IgniteDataTransferObject {
}
/** */
- public IndexRebuildStatusInfoContainer(CacheConfiguration cfg) {
- assert cfg != null;
+ public IndexRebuildStatusInfoContainer(GridCacheContext<?, ?> cctx) {
+ assert cctx != null;
+
+ CacheConfiguration<?, ?> cfg = cctx.config();
groupName = cfg.getGroupName() == null ? EMPTY_GROUP_NAME :
cfg.getGroupName();
cacheName = cfg.getName();
+ indexBuildPartitionsLeftCount =
cctx.cache().metrics0().getIndexBuildPartitionsLeftCount();
+ totalPartitionsCount = cctx.topology().localPartitions().size();
}
/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws
IOException {
U.writeString(out, groupName);
U.writeString(out, cacheName);
+ out.writeInt(indexBuildPartitionsLeftCount);
+ out.writeInt(totalPartitionsCount);
}
/** {@inheritDoc} */
@Override protected void readExternalData(byte protoVer, ObjectInput in)
throws IOException, ClassNotFoundException {
groupName = U.readString(in);
cacheName = U.readString(in);
+ indexBuildPartitionsLeftCount = in.readInt();
+ totalPartitionsCount = in.readInt();
}
/** {@inheritDoc} */
@@ -98,11 +113,32 @@ public class IndexRebuildStatusInfoContainer extends
IgniteDataTransferObject {
return cacheName;
}
+ /**
+ * @return Total local node partitions count.
+ */
+ public int totalPartitionsCount() {
+ return totalPartitionsCount;
+ }
+
+ /**
+ * @return The number of local node partitions that remain to be processed
to complete indexing.
+ */
+ public int indexBuildPartitionsLeftCount() {
+ return indexBuildPartitionsLeftCount;
+ }
+
/**
* @return default string object representation without {@code
IndexRebuildStatusInfoContainer} and brackets.
*/
@Override public String toString() {
- String dfltImpl = S.toString(IndexRebuildStatusInfoContainer.class,
this);
+ float progress = (float)(totalPartitionsCount -
indexBuildPartitionsLeftCount) / totalPartitionsCount;
+
+ String dfltImpl = S.toString(
+ IndexRebuildStatusInfoContainer.class,
+ this,
+ "progress",
+ (int)(Math.max(0, progress) * 100) + "%"
+ );
return
dfltImpl.substring(IndexRebuildStatusInfoContainer.class.getSimpleName().length()
+ 2,
dfltImpl.length() - 1);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusTask.java
index d50f07710ad..31111f8e198 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusTask.java
@@ -26,11 +26,10 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
@@ -99,15 +98,15 @@ public class IndexRebuildStatusTask extends
VisorMultiNodeTask<
@Override protected Set<IndexRebuildStatusInfoContainer> run(
@Nullable CacheIndexesRebuildStatusCommandArg arg
) throws IgniteException {
- Set<IgniteCache> rebuildIdxCaches =
+ Set<IgniteCacheProxy<?, ?>> rebuildIdxCaches =
ignite.context().cache().publicCaches().stream()
.filter(c -> !c.indexReadyFuture().isDone())
.collect(Collectors.toSet());
Set<IndexRebuildStatusInfoContainer> res = new HashSet<>();
- for (IgniteCache<?, ?> cache : rebuildIdxCaches)
- res.add(new
IndexRebuildStatusInfoContainer(cache.getConfiguration(CacheConfiguration.class)));
+ for (IgniteCacheProxy<?, ?> cache : rebuildIdxCaches)
+ res.add(new IndexRebuildStatusInfoContainer(cache.context()));
return res;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java
index eac03b8ecd3..6fbe537974e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java
@@ -55,9 +55,6 @@ public class CacheGroupMetricsImpl {
/** Cache group metrics prefix. */
public static final String CACHE_GROUP_METRICS_PREFIX = "cacheGroups";
- /** Number of partitions need processed for finished indexes create or
rebuilding. */
- private final AtomicLongMetric idxBuildCntPartitionsLeft;
-
/** Cache group context. */
private final CacheGroupContext ctx;
@@ -111,8 +108,11 @@ public class CacheGroupMetricsImpl {
() -> persistenceEnabled ? database().forGroupPageStores(ctx,
PageStore::getSparseSize) : 0,
"Storage space allocated for group adjusted for possible sparsity,
in bytes.");
- idxBuildCntPartitionsLeft =
mreg.longMetric("IndexBuildCountPartitionsLeft",
- "Number of partitions need processed for finished indexes create
or rebuilding.");
+ mreg.register(
+ "IndexBuildCountPartitionsLeft",
+ this::getIndexBuildCountPartitionsLeft,
+ "Number of partitions need processed for finished indexes create
or rebuilding."
+ );
initLocPartitionsNum =
mreg.longMetric("InitializedLocalPartitionsNumber",
"Number of local partitions initialized on current node.");
@@ -196,19 +196,7 @@ public class CacheGroupMetricsImpl {
/** */
public long getIndexBuildCountPartitionsLeft() {
- return idxBuildCntPartitionsLeft.value();
- }
-
- /** Add number of partitions need processed for finished indexes create or
rebuilding. */
- public void addIndexBuildCountPartitionsLeft(long
idxBuildCntPartitionsLeft) {
- this.idxBuildCntPartitionsLeft.add(idxBuildCntPartitionsLeft);
- }
-
- /**
- * Decrement number of partitions need processed for finished indexes
create or rebuilding.
- */
- public void decrementIndexBuildCountPartitionsLeft() {
- idxBuildCntPartitionsLeft.decrement();
+ return ctx.caches().stream().mapToLong(cctx ->
cctx.cache().metrics0().getIndexBuildPartitionsLeftCount()).sum();
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index be49398e4cf..80a27045abb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -34,6 +34,7 @@ import
org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
+import org.apache.ignite.internal.processors.metric.impl.IntMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.processors.metric.impl.LongGauge;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
@@ -245,6 +246,9 @@ public class CacheMetricsImpl implements CacheMetrics {
/** Number of keys processed during index rebuilding. */
private final LongAdderMetric idxRebuildKeyProcessed;
+ /** The number of local node partitions that remain to be processed to
complete indexing. */
+ private final IntMetricImpl idxBuildPartitionsLeftCnt;
+
/**
* Creates cache metrics.
*
@@ -431,6 +435,9 @@ public class CacheMetricsImpl implements CacheMetrics {
idxRebuildKeyProcessed =
mreg.longAdderMetric("IndexRebuildKeyProcessed",
"Number of keys processed during the index rebuilding.");
+
+ idxBuildPartitionsLeftCnt =
mreg.intMetric("IndexBuildPartitionsLeftCount",
+ "The number of local node partitions that remain to be processed
to complete indexing.");
}
/**
@@ -1658,6 +1665,26 @@ public class CacheMetricsImpl implements CacheMetrics {
idxRebuildKeyProcessed.add(val);
}
+ /** */
+ public void decrementIndexBuildPartitionsLeftCount() {
+ idxBuildPartitionsLeftCnt.decrement();
+ }
+
+ /** */
+ public void addIndexBuildPartitionsLeftCount(int val) {
+ idxBuildPartitionsLeftCnt.add(val);
+ }
+
+ /** */
+ public void resetIndexBuildPartitionsLeftCount() {
+ idxBuildPartitionsLeftCnt.reset();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getIndexBuildPartitionsLeftCount() {
+ return idxBuildPartitionsLeftCnt.value();
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheMetricsImpl.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index 2d7f62c625b..e50a1810213 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -1046,6 +1046,11 @@ public class CacheMetricsSnapshot implements
CacheMetrics, Externalizable {
return idxRebuildKeyProcessed;
}
+ /** {@inheritDoc} */
+ @Override public int getIndexBuildPartitionsLeftCount() {
+ return 0;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheMetricsSnapshot.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
index f792499619f..3249381fd6b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java
@@ -329,6 +329,9 @@ public class CacheMetricsSnapshotV2 extends
IgniteDataTransferObject implements
/** Number of keys processed during index rebuilding. */
private long idxRebuildKeyProcessed;
+ /** The number of local node partitions that remain to be processed to
complete indexing. */
+ private int idxBuildPartitionsLeftCount;
+
/**
* Default constructor.
*/
@@ -442,6 +445,8 @@ public class CacheMetricsSnapshotV2 extends
IgniteDataTransferObject implements
idxRebuildInProgress = m.isIndexRebuildInProgress();
idxRebuildKeyProcessed = m.getIndexRebuildKeysProcessed();
+
+ idxBuildPartitionsLeftCount = m.getIndexBuildPartitionsLeftCount();
}
/**
@@ -587,6 +592,7 @@ public class CacheMetricsSnapshotV2 extends
IgniteDataTransferObject implements
keysToRebalanceLeft += e.getKeysToRebalanceLeft();
rebalancingBytesRate += e.getRebalancingBytesRate();
rebalancingKeysRate += e.getRebalancingKeysRate();
+ idxBuildPartitionsLeftCount +=
e.getIndexBuildPartitionsLeftCount();
}
int size = metrics.size();
@@ -1072,6 +1078,11 @@ public class CacheMetricsSnapshotV2 extends
IgniteDataTransferObject implements
return idxRebuildKeyProcessed;
}
+ /** {@inheritDoc} */
+ @Override public int getIndexBuildPartitionsLeftCount() {
+ return idxBuildPartitionsLeftCount;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheMetricsSnapshotV2.class, this);
@@ -1154,6 +1165,7 @@ public class CacheMetricsSnapshotV2 extends
IgniteDataTransferObject implements
out.writeInt(size);
out.writeInt(keySize);
U.writeLongString(out, txKeyCollisions);
+ out.writeInt(idxBuildPartitionsLeftCount);
}
/** {@inheritDoc} */
@@ -1233,5 +1245,6 @@ public class CacheMetricsSnapshotV2 extends
IgniteDataTransferObject implements
size = in.readInt();
keySize = in.readInt();
txKeyCollisions = U.readLongString(in);
+ idxBuildPartitionsLeftCount = in.readInt();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/IntMetricImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/IntMetricImpl.java
index 2c4927706cb..c5c6d995ad5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/IntMetricImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/IntMetricImpl.java
@@ -55,6 +55,11 @@ public class IntMetricImpl extends AbstractMetric implements
IntMetric {
add(1);
}
+ /** Adds -1 to the metric. */
+ public void decrement() {
+ add(-1);
+ }
+
/**
* Sets value.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
index e37fa54f8ba..222551cd8ef 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
@@ -139,7 +139,7 @@ public class SchemaIndexCachePartitionWorker extends
GridWorker {
int cnt = partsCnt.getAndSet(0);
if (cnt > 0)
- cctx.group().metrics().addIndexBuildCountPartitionsLeft(-cnt);
+ cctx.cache().metrics0().resetIndexBuildPartitionsLeftCount();
}
finally {
fut.onDone(wrappedClo.indexCacheStat, err);
@@ -213,7 +213,7 @@ public class SchemaIndexCachePartitionWorker extends
GridWorker {
locPart.release();
if (partsCnt.getAndUpdate(v -> v > 0 ? v - 1 : 0) > 0)
-
cctx.group().metrics().decrementIndexBuildCountPartitionsLeft();
+
cctx.cache().metrics0().decrementIndexBuildPartitionsLeftCount();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index d00691f0fc4..241838f9a0e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -102,7 +102,7 @@ public class SchemaIndexCacheVisitorImpl implements
SchemaIndexCacheVisitor {
return;
}
-
cctx.group().metrics().addIndexBuildCountPartitionsLeft(locParts.size());
+
cctx.cache().metrics0().addIndexBuildPartitionsLeftCount(locParts.size());
cctx.cache().metrics0().resetIndexRebuildKeyProcessed();
beforeExecute();
diff --git
a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
index 6d9a557dc49..8b85f6be36e 100644
---
a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
+++
b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java
@@ -552,5 +552,10 @@ public class PlatformCacheWriteMetricsTask extends
ComputeTaskAdapter<Long, Obje
@Override public long getIndexRebuildKeysProcessed() {
return 0;
}
+
+ /** {@inheritDoc} */
+ @Override public int getIndexBuildPartitionsLeftCount() {
+ return 0;
+ }
}
}