This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 54e3aa4 IGNITE-12821 Added check-sizes parameter to validate_indexes
cmd
54e3aa4 is described below
commit 54e3aa421927c3fb07339b1ca769f2f1e21cd4a0
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Apr 14 16:48:51 2020 +0300
IGNITE-12821 Added check-sizes parameter to validate_indexes cmd
Signed-off-by: Slava Koptilin <[email protected]>
---
.../commandline/cache/CacheValidateIndexes.java | 137 ++++--
.../cache/argument/ValidateIndexesCommandArg.java | 9 +-
.../internal/dto/IgniteDataTransferObject.java | 3 +
.../processors/query/GridQueryIndexing.java | 5 +-
.../processors/query/GridQueryProcessor.java | 12 +-
.../query/schema/SchemaIndexCacheVisitorImpl.java | 12 +-
.../verify/ValidateIndexesCheckSizeIssue.java | 107 ++++
.../verify/ValidateIndexesCheckSizeResult.java | 101 ++++
.../verify/VisorValidateIndexesJobResult.java | 78 +--
.../visor/verify/VisorValidateIndexesTaskArg.java | 81 +--
.../main/resources/META-INF/classnames.properties | 2 +
.../util/GridCommandHandlerAbstractTest.java | 19 +-
...mandHandlerClusterByClassTest_cache_help.output | 3 +-
...dlerClusterByClassWithSSLTest_cache_help.output | 3 +-
.../processors/query/h2/IgniteH2Indexing.java | 4 +-
.../visor/verify/ValidateIndexesClosure.java | 524 ++++++++++++++++----
.../visor/verify/VisorValidateIndexesTask.java | 3 +-
...xingMultithreadedLoadContinuousRestartTest.java | 2 +-
.../db/LongDestroyDurableBackgroundTaskTest.java | 2 +-
.../processors/database/RebuildIndexTest.java | 2 +-
.../RebuildIndexWithHistoricalRebalanceTest.java | 2 +-
.../query/h2/GridIndexFullRebuildTest.java | 2 +-
...teCacheWithIndexingAndPersistenceTestSuite.java | 2 +
.../GridCommandHandlerIndexingCheckSizeTest.java | 547 +++++++++++++++++++++
...idCommandHandlerIndexingClusterByClassTest.java | 128 +----
.../util/GridCommandHandlerIndexingUtils.java | 297 +++++++++--
26 files changed, 1694 insertions(+), 393 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheValidateIndexes.java
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheValidateIndexes.java
index c98f28a..eef77c6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheValidateIndexes.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheValidateIndexes.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.commandline.cache;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.logging.Logger;
@@ -34,6 +35,8 @@ import
org.apache.ignite.internal.processors.cache.verify.PartitionKey;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.verify.IndexIntegrityCheckIssue;
import org.apache.ignite.internal.visor.verify.IndexValidationIssue;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesCheckSizeIssue;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesCheckSizeResult;
import org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult;
import org.apache.ignite.internal.visor.verify.VisorValidateIndexesJobResult;
import org.apache.ignite.internal.visor.verify.VisorValidateIndexesTaskArg;
@@ -41,6 +44,7 @@ import
org.apache.ignite.internal.visor.verify.VisorValidateIndexesTaskResult;
import static
org.apache.ignite.internal.commandline.CommandLogger.DOUBLE_INDENT;
import static org.apache.ignite.internal.commandline.CommandLogger.INDENT;
+import static org.apache.ignite.internal.commandline.CommandLogger.join;
import static org.apache.ignite.internal.commandline.CommandLogger.optional;
import static org.apache.ignite.internal.commandline.CommandLogger.or;
import static
org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;
@@ -53,6 +57,7 @@ import static
org.apache.ignite.internal.commandline.cache.argument.IdleVerifyCo
import static
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_FIRST;
import static
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_THROUGH;
import static
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_CRC;
+import static
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_SIZES;
/**
* Validate indexes command.
@@ -74,9 +79,17 @@ public class CacheValidateIndexes implements
Command<CacheValidateIndexes.Argume
map.put(CHECK_FIRST + " N", "validate only the first N keys");
map.put(CHECK_THROUGH + " K", "validate every Kth key");
map.put(CHECK_CRC.toString(), "check the CRC-sum of pages stored on
disk");
-
- usageCache(logger, VALIDATE_INDEXES, description, map,
- optional(CACHES), OP_NODE_ID, optional(or(CHECK_FIRST + " N",
CHECK_THROUGH + " K")));
+ map.put(CHECK_SIZES.toString(), "check that index size and cache size
are the same");
+
+ usageCache(
+ logger,
+ VALIDATE_INDEXES,
+ description,
+ map,
+ optional(CACHES),
+ OP_NODE_ID,
+ optional(or(CHECK_FIRST + " N", CHECK_THROUGH + " K", CHECK_CRC,
CHECK_SIZES))
+ );
}
/**
@@ -84,33 +97,47 @@ public class CacheValidateIndexes implements
Command<CacheValidateIndexes.Argume
*/
public class Arguments {
/** Caches. */
- private Set<String> caches;
+ private final Set<String> caches;
/** Node id. */
- private UUID nodeId;
+ private final UUID nodeId;
/** Max number of entries to be checked. */
- private int checkFirst = -1;
+ private final int checkFirst;
/** Number of entries to check through. */
- private int checkThrough = -1;
+ private final int checkThrough;
+
+ /** Check CRC. */
+ private final boolean checkCrc;
- /** Check CRC */
- private boolean checkCrc;
+ /** Check that index size and cache size are same. */
+ private final boolean checkSizes;
/**
- * @param caches Caches to validate.
- * @param nodeId Node Id.
- * @param checkFirst Max number of entries to be checked..
+ * Constructor.
+ *
+ * @param caches Caches.
+ * @param nodeId Node id.
+ * @param checkFirst Max number of entries to be checked.
* @param checkThrough Number of entries to check through.
* @param checkCrc Check CRC.
+ * @param checkSizes Check that index size and cache size are same.
*/
- public Arguments(Set<String> caches, UUID nodeId, int checkFirst, int
checkThrough, boolean checkCrc) {
+ public Arguments(
+ Set<String> caches,
+ UUID nodeId,
+ int checkFirst,
+ int checkThrough,
+ boolean checkCrc,
+ boolean checkSizes
+ ) {
this.caches = caches;
this.nodeId = nodeId;
this.checkFirst = checkFirst;
this.checkThrough = checkThrough;
this.checkCrc = checkCrc;
+ this.checkSizes = checkSizes;
}
/**
@@ -128,13 +155,6 @@ public class CacheValidateIndexes implements
Command<CacheValidateIndexes.Argume
}
/**
- * @return Check CRC
- */
- public boolean checkCrc() {
- return checkCrc;
- }
-
- /**
* @return Number of entries to check through.
*/
public int checkThrough() {
@@ -147,6 +167,23 @@ public class CacheValidateIndexes implements
Command<CacheValidateIndexes.Argume
public UUID nodeId() {
return nodeId;
}
+
+ /**
+ * @return Check CRC.
+ */
+ public boolean checkCrc() {
+ return checkCrc;
+ }
+
+ /**
+ * Returns whether to check that index size and cache size are same.
+ *
+ * @return {@code true} if need check that index size and cache size
+ * are same.
+ */
+ public boolean checkSizes() {
+ return checkSizes;
+ }
}
/** Command parsed arguments. */
@@ -164,7 +201,8 @@ public class CacheValidateIndexes implements
Command<CacheValidateIndexes.Argume
args.nodeId() != null ? Collections.singleton(args.nodeId()) :
null,
args.checkFirst(),
args.checkThrough(),
- args.checkCrc()
+ args.checkCrc(),
+ args.checkSizes()
);
try (GridClient client = Command.startClient(clientCfg)) {
@@ -173,46 +211,53 @@ public class CacheValidateIndexes implements
Command<CacheValidateIndexes.Argume
boolean errors = CommandLogger.printErrors(taskRes.exceptions(),
"Index validation failed on nodes:", logger);
- for (Map.Entry<UUID, VisorValidateIndexesJobResult> nodeEntry :
taskRes.results().entrySet()) {
- if (!nodeEntry.getValue().hasIssues())
+ for (Entry<UUID, VisorValidateIndexesJobResult> nodeEntry :
taskRes.results().entrySet()) {
+ VisorValidateIndexesJobResult jobRes = nodeEntry.getValue();
+
+ if (!jobRes.hasIssues())
continue;
errors = true;
logger.info("Index issues found on node " + nodeEntry.getKey()
+ ":");
- Collection<IndexIntegrityCheckIssue> integrityCheckFailures =
nodeEntry.getValue().integrityCheckFailures();
+ for (IndexIntegrityCheckIssue is :
jobRes.integrityCheckFailures())
+ logger.info(INDENT + is);
- if (!integrityCheckFailures.isEmpty()) {
- for (IndexIntegrityCheckIssue is : integrityCheckFailures)
- logger.info(INDENT + is);
- }
-
- Map<PartitionKey, ValidateIndexesPartitionResult> partRes =
nodeEntry.getValue().partitionResult();
-
- for (Map.Entry<PartitionKey, ValidateIndexesPartitionResult> e
: partRes.entrySet()) {
+ for (Entry<PartitionKey, ValidateIndexesPartitionResult> e :
jobRes.partitionResult().entrySet()) {
ValidateIndexesPartitionResult res = e.getValue();
if (!res.issues().isEmpty()) {
- logger.info(INDENT + CommandLogger.join(" ",
e.getKey(), e.getValue()));
+ logger.info(INDENT + join(" ", e.getKey(),
e.getValue()));
for (IndexValidationIssue is : res.issues())
logger.info(DOUBLE_INDENT + is);
}
}
- Map<String, ValidateIndexesPartitionResult> idxRes =
nodeEntry.getValue().indexResult();
-
- for (Map.Entry<String, ValidateIndexesPartitionResult> e :
idxRes.entrySet()) {
+ for (Entry<String, ValidateIndexesPartitionResult> e :
jobRes.indexResult().entrySet()) {
ValidateIndexesPartitionResult res = e.getValue();
if (!res.issues().isEmpty()) {
- logger.info(INDENT + CommandLogger.join(" ", "SQL
Index", e.getKey(), e.getValue()));
+ logger.info(INDENT + join(" ", "SQL Index",
e.getKey(), e.getValue()));
for (IndexValidationIssue is : res.issues())
logger.info(DOUBLE_INDENT + is);
}
}
+
+ for (Entry<String, ValidateIndexesCheckSizeResult> e :
jobRes.checkSizeResult().entrySet()) {
+ ValidateIndexesCheckSizeResult res = e.getValue();
+ Collection<ValidateIndexesCheckSizeIssue> issues =
res.issues();
+
+ if (issues.isEmpty())
+ continue;
+
+ logger.info(INDENT + join(" ", "Size check", e.getKey(),
res));
+
+ for (ValidateIndexesCheckSizeIssue issue : issues)
+ logger.info(DOUBLE_INDENT + issue);
+ }
}
if (!errors)
@@ -230,18 +275,15 @@ public class CacheValidateIndexes implements
Command<CacheValidateIndexes.Argume
@Override public void parseArguments(CommandArgIterator argIter) {
int checkFirst = -1;
int checkThrough = -1;
- boolean checkCrc = false;
UUID nodeId = null;
Set<String> caches = null;
+ boolean checkCrc = false;
+ boolean checkSizes = false;
while (argIter.hasNextSubArg()) {
String nextArg = argIter.nextArg("");
ValidateIndexesCommandArg arg = CommandArgUtils.of(nextArg,
ValidateIndexesCommandArg.class);
- if (arg == CHECK_CRC) {
- checkCrc = true;
- continue;
- }
if (arg == CHECK_FIRST || arg == CHECK_THROUGH) {
if (!argIter.hasNextSubArg())
@@ -270,6 +312,15 @@ public class CacheValidateIndexes implements
Command<CacheValidateIndexes.Argume
continue;
}
+ else if (arg == CHECK_CRC) {
+ checkCrc = true;
+ continue;
+ }
+ else if (CHECK_SIZES == arg) {
+ checkSizes = true;
+
+ continue;
+ }
try {
nodeId = UUID.fromString(nextArg);
@@ -283,7 +334,7 @@ public class CacheValidateIndexes implements
Command<CacheValidateIndexes.Argume
caches = argIter.parseStringSet(nextArg);
}
- args = new Arguments(caches, nodeId, checkFirst, checkThrough,
checkCrc);
+ args = new Arguments(caches, nodeId, checkFirst, checkThrough,
checkCrc, checkSizes);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/argument/ValidateIndexesCommandArg.java
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/argument/ValidateIndexesCommandArg.java
index f3b4284..7bc4caf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/argument/ValidateIndexesCommandArg.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/argument/ValidateIndexesCommandArg.java
@@ -27,11 +27,14 @@ public enum ValidateIndexesCommandArg implements CommandArg
{
/** Check first. */
CHECK_FIRST("--check-first"),
- /** Check crc. */
+ /** Check through. */
+ CHECK_THROUGH("--check-through"),
+
+ /** Check CRC. */
CHECK_CRC("--check-crc"),
- /** Check through. */
- CHECK_THROUGH("--check-through");
+ /** Check sizes. */
+ CHECK_SIZES("--check-sizes");
/** Option name. */
private final String name;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
b/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
index bdfe7c2..f14a527 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
@@ -56,6 +56,9 @@ public abstract class IgniteDataTransferObject implements
Externalizable {
/** Version 6. */
protected static final byte V6 = 6;
+ /** Version 7. */
+ protected static final byte V7 = 7;
+
/**
* @param col Source collection.
* @param <T> Collection type.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 3e07680..cd18553 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -465,13 +465,14 @@ public interface GridQueryIndexing {
Collection<ColumnInformation> columnsInformation(String schemaNamePtrn,
String tblNamePtrn, String colNamePtrn);
/**
- * Return index size by schema name and index name.
+ * Return index size by schema, table and index name.
*
* @param schemaName Schema name.
+ * @param tblName Table name.
* @param idxName Index name.
* @return Index size (Number of elements) or {@code 0} if index not found.
*/
- default long indexSize(String schemaName, String idxName) throws
IgniteCheckedException {
+ default long indexSize(String schemaName, String tblName, String idxName)
throws IgniteCheckedException {
return 0;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 45f2e0d..4b71aa2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -123,6 +124,7 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.newSetFromMap;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
@@ -189,7 +191,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
private ClusterNode crd;
/** Registered cache names. */
- private final Collection<String> cacheNames =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ private final Collection<String> cacheNames = newSetFromMap(new
ConcurrentHashMap<String, Boolean>());
/** ID history for index create/drop discovery messages. */
private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> dscoMsgIdHist
=
@@ -215,7 +217,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
private boolean skipFieldLookup;
/** Cache name - value typeId pairs for which type mismatch message was
logged. */
- private final Set<Long> missedCacheTypes = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+ private final Set<Long> missedCacheTypes = newSetFromMap(new
ConcurrentHashMap<>());
/**
* @param ctx Kernal context.
@@ -2906,13 +2908,11 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
* @return Descriptors.
*/
public Collection<GridQueryTypeDescriptor> types(@Nullable String
cacheName) {
- Collection<GridQueryTypeDescriptor> cacheTypes = new ArrayList<>();
+ Collection<GridQueryTypeDescriptor> cacheTypes = newSetFromMap(new
IdentityHashMap<>());
for (Map.Entry<QueryTypeIdKey, QueryTypeDescriptorImpl> e :
types.entrySet()) {
- QueryTypeDescriptorImpl desc = e.getValue();
-
if (F.eq(e.getKey().cacheName(), cacheName))
- cacheTypes.add(desc);
+ cacheTypes.add(e.getValue());
}
return cacheTypes;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index 7238d96..09163b8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.schema;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -180,15 +179,14 @@ public class SchemaIndexCacheVisitorImpl implements
SchemaIndexCacheVisitor {
res.a(" Type name=" + type.name());
res.a(U.nl());
- final String pk = "_key_PK";
+ String pk = "_key_PK";
+ String tblName = type.tableName();
- res.a(" Index: name=" + pk + ", size=" +
idx.indexSize(type.schemaName(), pk));
+ res.a(" Index: name=" + pk + ", size=" +
idx.indexSize(type.schemaName(), tblName, pk));
res.a(U.nl());
- final Map<String, GridQueryIndexDescriptor> indexes =
type.indexes();
-
- for (GridQueryIndexDescriptor descriptor : indexes.values()) {
- final long size = idx.indexSize(type.schemaName(),
descriptor.name());
+ for (GridQueryIndexDescriptor descriptor :
type.indexes().values()) {
+ long size = idx.indexSize(type.schemaName(), tblName,
descriptor.name());
res.a(" Index: name=" + descriptor.name() + ",
size=" + size);
res.a(U.nl());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesCheckSizeIssue.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesCheckSizeIssue.java
new file mode 100644
index 0000000..b87f2a6
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesCheckSizeIssue.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.verify;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.util.IgniteUtils.readLongString;
+import static org.apache.ignite.internal.util.IgniteUtils.writeLongString;
+
+/**
+ * Issue when checking size of cache and index.
+ */
+public class ValidateIndexesCheckSizeIssue extends IgniteDataTransferObject {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Index name. */
+ private String idxName;
+
+ /** Index size. */
+ private long idxSize;
+
+ /** Error. */
+ @GridToStringExclude
+ private Throwable t;
+
+ /**
+ * Default constructor.
+ */
+ public ValidateIndexesCheckSizeIssue() {
+ //Default constructor required for Externalizable.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param idxName Index name.
+ * @param idxSize Index size.
+ * @param t Error.
+ */
+ public ValidateIndexesCheckSizeIssue(@Nullable String idxName, long
idxSize, @Nullable Throwable t) {
+ this.idxName = idxName;
+ this.idxSize = idxSize;
+ this.t = t;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws
IOException {
+ writeLongString(out, idxName);
+ out.writeLong(idxSize);
+ out.writeObject(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(
+ byte protoVer,
+ ObjectInput in
+ ) throws IOException, ClassNotFoundException {
+ idxName = readLongString(in);
+ idxSize = in.readLong();
+ t = (Throwable)in.readObject();
+ }
+
+ /**
+ * Return index size.
+ *
+ * @return Index size.
+ */
+ public long indexSize() {
+ return idxSize;
+ }
+
+ /**
+ * Return error.
+ *
+ * @return Error.
+ */
+ public Throwable error() {
+ return t;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ValidateIndexesCheckSizeIssue.class, this, "err", t);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesCheckSizeResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesCheckSizeResult.java
new file mode 100644
index 0000000..c610163
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesCheckSizeResult.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.verify;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static java.util.Collections.emptyList;
+import static org.apache.ignite.internal.util.IgniteUtils.readCollection;
+import static org.apache.ignite.internal.util.IgniteUtils.writeCollection;
+
+/**
+ * Result of checking size cache and index.
+ */
+public class ValidateIndexesCheckSizeResult extends IgniteDataTransferObject {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache size. */
+ private long cacheSize;
+
+ /** Issues. */
+ @GridToStringExclude
+ private Collection<ValidateIndexesCheckSizeIssue> issues;
+
+ /**
+ * Default constructor.
+ */
+ public ValidateIndexesCheckSizeResult() {
+ //Default constructor required for Externalizable.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheSize Cache size.
+ * @param issues Issues.
+ */
+ public ValidateIndexesCheckSizeResult(long cacheSize,
Collection<ValidateIndexesCheckSizeIssue> issues) {
+ this.cacheSize = cacheSize;
+ this.issues = issues;
+ }
+
+ /**
+ * Return issues when checking size of cache and index.
+ *
+ * @return Issues when checking size of cache and index.
+ */
+ public Collection<ValidateIndexesCheckSizeIssue> issues() {
+ return issues == null ? emptyList() : issues;
+ }
+
+ /**
+ * Return cache size.
+ *
+ * @return Cache size.
+ */
+ public long cacheSize() {
+ return cacheSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws
IOException {
+ out.writeLong(cacheSize);
+ writeCollection(out, issues);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(
+ byte protoVer,
+ ObjectInput in
+ ) throws IOException, ClassNotFoundException {
+ cacheSize = in.readLong();
+ issues = readCollection(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ValidateIndexesCheckSizeResult.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
index 99bccbd..4b056d8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
@@ -21,19 +21,24 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.visor.VisorDataTransferObject;
-import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static org.apache.ignite.internal.util.IgniteUtils.readCollection;
+import static org.apache.ignite.internal.util.IgniteUtils.readMap;
+import static org.apache.ignite.internal.util.IgniteUtils.writeCollection;
+import static org.apache.ignite.internal.util.IgniteUtils.writeMap;
/**
*
*/
-public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
+public class VisorValidateIndexesJobResult extends IgniteDataTransferObject {
/** */
private static final long serialVersionUID = 0L;
@@ -49,19 +54,28 @@ public class VisorValidateIndexesJobResult extends
VisorDataTransferObject {
@GridToStringInclude
private Collection<IndexIntegrityCheckIssue> integrityCheckFailures;
+ /** Results of checking size cache and index. */
+ @GridToStringInclude
+ private Map<String, ValidateIndexesCheckSizeResult> checkSizeRes;
+
/**
+ * Constructor.
+ *
* @param partRes Results of indexes validation from node.
* @param idxRes Results of reverse indexes validation from node.
- * @param integrityCheckFailures Collection of indexes integrity check
failures.
+ * @param integrityCheckFailures Collection of indexes integrity check
failures.
+ * @param checkSizeRes Results of checking size cache and index.
*/
public VisorValidateIndexesJobResult(
- @NotNull Map<PartitionKey, ValidateIndexesPartitionResult> partRes,
- @NotNull Map<String, ValidateIndexesPartitionResult> idxRes,
- @NotNull Collection<IndexIntegrityCheckIssue>
integrityCheckFailures
+ Map<PartitionKey, ValidateIndexesPartitionResult> partRes,
+ @Nullable Map<String, ValidateIndexesPartitionResult> idxRes,
+ @Nullable Collection<IndexIntegrityCheckIssue> integrityCheckFailures,
+ @Nullable Map<String, ValidateIndexesCheckSizeResult> checkSizeRes
) {
this.partRes = partRes;
this.idxRes = idxRes;
this.integrityCheckFailures = integrityCheckFailures;
+ this.checkSizeRes = checkSizeRes;
}
/**
@@ -70,11 +84,6 @@ public class VisorValidateIndexesJobResult extends
VisorDataTransferObject {
public VisorValidateIndexesJobResult() {
}
- /** {@inheritDoc} */
- @Override public byte getProtocolVersion() {
- return V3;
- }
-
/**
* @return Results of indexes validation from node.
*/
@@ -86,14 +95,23 @@ public class VisorValidateIndexesJobResult extends
VisorDataTransferObject {
* @return Results of reverse indexes validation from node.
*/
public Map<String, ValidateIndexesPartitionResult> indexResult() {
- return idxRes == null ? Collections.emptyMap() : idxRes;
+ return idxRes == null ? emptyMap() : idxRes;
}
/**
* @return Collection of failed integrity checks.
*/
public Collection<IndexIntegrityCheckIssue> integrityCheckFailures() {
- return integrityCheckFailures == null ? Collections.emptyList() :
integrityCheckFailures;
+ return integrityCheckFailures == null ? emptyList() :
integrityCheckFailures;
+ }
+
+ /**
+ * Return results of checking size cache and index.
+ *
+ * @return Results of checking size cache and index.
+ */
+ public Map<String, ValidateIndexesCheckSizeResult> checkSizeResult() {
+ return checkSizeRes == null ? emptyMap() : checkSizeRes;
}
/**
@@ -101,26 +119,28 @@ public class VisorValidateIndexesJobResult extends
VisorDataTransferObject {
*/
public boolean hasIssues() {
return (integrityCheckFailures != null &&
!integrityCheckFailures.isEmpty()) ||
- (partRes != null && partRes.entrySet().stream().anyMatch(e ->
!e.getValue().issues().isEmpty())) ||
- (idxRes != null && idxRes.entrySet().stream().anyMatch(e ->
!e.getValue().issues().isEmpty()));
+ (partRes != null && partRes.entrySet().stream().anyMatch(e ->
!e.getValue().issues().isEmpty())) ||
+ (idxRes != null && idxRes.entrySet().stream().anyMatch(e ->
!e.getValue().issues().isEmpty())) ||
+ (checkSizeRes != null &&
checkSizeRes.entrySet().stream().anyMatch(e ->
!e.getValue().issues().isEmpty()));
}
/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws
IOException {
- U.writeMap(out, partRes);
- U.writeMap(out, idxRes);
- U.writeCollection(out, integrityCheckFailures);
+ writeMap(out, partRes);
+ writeMap(out, idxRes);
+ writeCollection(out, integrityCheckFailures);
+ writeMap(out, checkSizeRes);
}
/** {@inheritDoc} */
- @Override protected void readExternalData(byte protoVer, ObjectInput in)
throws IOException, ClassNotFoundException {
- partRes = U.readMap(in);
-
- if (protoVer >= V2)
- idxRes = U.readMap(in);
-
- if (protoVer >= V3)
- integrityCheckFailures = U.readCollection(in);
+ @Override protected void readExternalData(
+ byte protoVer,
+ ObjectInput in
+ ) throws IOException, ClassNotFoundException {
+ partRes = readMap(in);
+ idxRes = readMap(in);
+ integrityCheckFailures = readCollection(in);
+ checkSizeRes = readMap(in);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTaskArg.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTaskArg.java
index 0bddb9c..02c28b4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTaskArg.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTaskArg.java
@@ -24,7 +24,9 @@ import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.dto.IgniteDataTransferObject;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.util.IgniteUtils.readSet;
+import static org.apache.ignite.internal.util.IgniteUtils.writeCollection;
/**
*
@@ -37,17 +39,20 @@ public class VisorValidateIndexesTaskArg extends
IgniteDataTransferObject {
private Set<String> caches;
/** Check first K elements. */
- private int checkFirst;
-
- /** Check CRC */
- private boolean checkCrc;
+ private int checkFirst = -1;
/** Check through K element (skip K-1, check Kth). */
- private int checkThrough;
+ private int checkThrough = -1;
/** Nodes on which task will run. */
private Set<UUID> nodes;
+ /** Check CRC. */
+ private boolean checkCrc;
+
+ /** Check that index size and cache size are same. */
+ private boolean checkSizes;
+
/**
* Default constructor.
*/
@@ -56,20 +61,29 @@ public class VisorValidateIndexesTaskArg extends
IgniteDataTransferObject {
}
/**
+ * Constructor.
+ *
* @param caches Caches.
+ * @param nodes Nodes on which task will run.
+ * @param checkFirst Check first K elements.
+ * @param checkThrough Check through K element.
+ * @param checkCrc Check CRC.
+ * @param checkSizes Check that index size and cache size are same.
*/
public VisorValidateIndexesTaskArg(
Set<String> caches,
Set<UUID> nodes,
int checkFirst,
int checkThrough,
- boolean checkCrc
+ boolean checkCrc,
+ boolean checkSizes
) {
this.caches = caches;
this.checkFirst = checkFirst;
this.checkThrough = checkThrough;
this.nodes = nodes;
this.checkCrc = checkCrc;
+ this.checkSizes = checkSizes;
}
/**
@@ -94,6 +108,13 @@ public class VisorValidateIndexesTaskArg extends
IgniteDataTransferObject {
}
/**
+ * @return checkThrough.
+ */
+ public int getCheckThrough() {
+ return checkThrough;
+ }
+
+ /**
* @return checkCrc.
*/
public boolean сheckCrc() {
@@ -101,49 +122,33 @@ public class VisorValidateIndexesTaskArg extends
IgniteDataTransferObject {
}
/**
- * @return checkThrough.
+ * Returns whether to check that index size and cache size are same.
+ *
+ * @return {@code true} if need check that index size and cache size
+ * are same.
*/
- public int getCheckThrough() {
- return checkThrough;
+ public boolean checkSizes() {
+ return checkSizes;
}
/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws
IOException {
- U.writeCollection(out, caches);
+ writeCollection(out, caches);
out.writeInt(checkFirst);
out.writeInt(checkThrough);
- U.writeCollection(out, nodes);
+ writeCollection(out, nodes);
out.writeBoolean(checkCrc);
+ out.writeBoolean(checkSizes);
}
/** {@inheritDoc} */
@Override protected void readExternalData(byte protoVer, ObjectInput in)
throws IOException, ClassNotFoundException {
- caches = U.readSet(in);
-
- if (protoVer > V1) {
- checkFirst = in.readInt();
- checkThrough = in.readInt();
- }
- else {
- checkFirst = -1;
- checkThrough = -1;
- }
-
- if (protoVer > V2)
- nodes = U.readSet(in);
-
- if (protoVer >= V6)
- checkCrc = in.readBoolean();
- }
-
- /** Set checkCrc */
- protected void checkCrc(boolean checkCrc) {
- this.checkCrc = checkCrc;
- }
-
- /** {@inheritDoc} */
- @Override public byte getProtocolVersion() {
- return V6;
+ caches = readSet(in);
+ checkFirst = in.readInt();
+ checkThrough = in.readInt();
+ nodes = readSet(in);
+ checkCrc = in.readBoolean();
+ checkSizes = in.readBoolean();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index ec0af5f..9641d69 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2395,6 +2395,8 @@
org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskV2
org.apache.ignite.internal.visor.verify.VisorValidateIndexesJobResult
org.apache.ignite.internal.visor.verify.VisorValidateIndexesTaskArg
org.apache.ignite.internal.visor.verify.VisorValidateIndexesTaskResult
+org.apache.ignite.internal.visor.verify.ValidateIndexesCheckSizeIssue
+org.apache.ignite.internal.visor.verify.ValidateIndexesCheckSizeResult
org.apache.ignite.internal.visor.verify.VisorViewCacheCmd
org.apache.ignite.internal.visor.verify.VisorViewCacheTask
org.apache.ignite.internal.visor.verify.VisorViewCacheTask$VisorViewCacheJob
diff --git
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
index 2b7472d..1c19856 100644
---
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
@@ -111,6 +111,9 @@ public abstract class GridCommandHandlerAbstractTest
extends GridCommonAbstractT
/** {@code True} if encription is enabled. */
protected boolean encriptionEnabled;
+ /** Last operation result. */
+ protected Object lastOperationResult;
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
@@ -185,18 +188,17 @@ public abstract class GridCommandHandlerAbstractTest
extends GridCommonAbstractT
if (sslEnabled())
cfg.setSslContextFactory(GridTestUtils.sslFactory());
- DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+ .setWalMode(WALMode.LOG_ONLY)
.setCheckpointFrequency(checkpointFreq)
- .setDefaultDataRegionConfiguration(new
DataRegionConfiguration().setMaxSize(50L * 1024 * 1024));
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setMaxSize(50L * 1024 *
1024).setPersistenceEnabled(true)
+ );
if (dataRegionConfiguration != null)
- memCfg.setDataRegionConfigurations(dataRegionConfiguration);
-
- cfg.setDataStorageConfiguration(memCfg);
+ dsCfg.setDataRegionConfigurations(dataRegionConfiguration);
- DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
- dsCfg.setWalMode(WALMode.LOG_ONLY);
- dsCfg.getDefaultDataRegionConfiguration().setPersistenceEnabled(true);
+ cfg.setDataStorageConfiguration(dsCfg);
cfg.setConsistentId(igniteInstanceName);
@@ -265,6 +267,7 @@ public abstract class GridCommandHandlerAbstractTest
extends GridCommonAbstractT
testOut.reset();
int exitCode = hnd.execute(args);
+ lastOperationResult = hnd.getLastOperationResult();
// Flush all Logger handlers to make log data available to test.
Logger logger = U.field(hnd, "logger");
diff --git
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_cache_help.output
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_cache_help.output
index 8a8ce8c..f5518c3 100644
---
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_cache_help.output
+++
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_cache_help.output
@@ -29,13 +29,14 @@ Arguments: --cache help --yes
--groups - print information about groups.
--seq - print information about sequences.
- --cache validate_indexes [cacheName1,...,cacheNameN] [nodeId] [--check-first
N|--check-through K]
+ --cache validate_indexes [cacheName1,...,cacheNameN] [nodeId] [--check-first
N|--check-through K|--check-crc|--check-sizes]
Verify counters and hash sums of primary and backup partitions for the
specified caches/cache groups on an idle cluster and print out the differences,
if any. Cache filtering options configure the set of caches that will be
processed by idle_verify command. Default value for the set of cache names (or
cache group names) is all cache groups. Default value for --exclude-caches is
empty set. Default value for --cache-filter is no filtering. Therefore, the set
of all caches is sequently [...]
Parameters:
--check-first N - validate only the first N keys
--check-through K - validate every Kth key
--check-crc - check the CRC-sum of pages stored on disk
+ --check-sizes - check that index size and cache size are the same
--cache contention minQueueSize [nodeId] [maxPrint]
Show the keys that are point of contention for multiple transactions.
diff --git
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_cache_help.output
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_cache_help.output
index 8a8ce8c..f5518c3 100644
---
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_cache_help.output
+++
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_cache_help.output
@@ -29,13 +29,14 @@ Arguments: --cache help --yes
--groups - print information about groups.
--seq - print information about sequences.
- --cache validate_indexes [cacheName1,...,cacheNameN] [nodeId] [--check-first
N|--check-through K]
+ --cache validate_indexes [cacheName1,...,cacheNameN] [nodeId] [--check-first
N|--check-through K|--check-crc|--check-sizes]
Verify counters and hash sums of primary and backup partitions for the
specified caches/cache groups on an idle cluster and print out the differences,
if any. Cache filtering options configure the set of caches that will be
processed by idle_verify command. Default value for the set of cache names (or
cache group names) is all cache groups. Default value for --exclude-caches is
empty set. Default value for --cache-filter is no filtering. Therefore, the set
of all caches is sequently [...]
Parameters:
--check-first N - validate only the first N keys
--check-through K - validate every Kth key
--check-crc - check the CRC-sum of pages stored on disk
+ --check-sizes - check that index size and cache size are the same
--cache contention minQueueSize [nodeId] [maxPrint]
Show the keys that are point of contention for multiple transactions.
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 6a05b2a..a9271f8 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -3074,8 +3074,8 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public long indexSize(String schemaName, String idxName) throws
IgniteCheckedException {
- GridH2Table tbl = schemaMgr.dataTableForIndex(schemaName, idxName);
+ @Override public long indexSize(String schemaName, String tblName, String
idxName) throws IgniteCheckedException {
+ GridH2Table tbl = schemaMgr.dataTable(schemaName, tblName);
if (tbl == null)
return 0;
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
index 9a69d62..cf96d60 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
@@ -16,8 +16,6 @@
*/
package org.apache.ignite.internal.visor.verify;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -25,8 +23,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -34,6 +34,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
@@ -41,15 +42,12 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
@@ -70,6 +68,7 @@ import
org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
@@ -79,16 +78,25 @@ import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.index.Index;
import org.h2.message.DbException;
+import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.newSetFromMap;
+import static java.util.Collections.shuffle;
+import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.util.IgniteUtils.error;
/**
* Closure that locally validates indexes of given caches.
- * Validation consists of three checks:
+ * Validation consists of four checks:
* 1. If entry is present in cache data tree, it's reachable from all cache
SQL indexes
* 2. If entry is present in cache SQL index, it can be dereferenced with link
from index
* 3. If entry is present in cache SQL index, it's present in cache data tree
+ * 4. If size of cache and index on same table are not same
*/
public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndexesJobResult> {
/** */
@@ -103,7 +111,7 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
private IgniteLogger log;
/** Cache names. */
- private Set<String> cacheNames;
+ private final Set<String> cacheNames;
/** If provided only first K elements will be validated. */
private final int checkFirst;
@@ -111,8 +119,11 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
/** If provided only each Kth element will be validated. */
private final int checkThrough;
- /** Check CRC */
- private boolean checkCrc;
+ /** Check CRC. */
+ private final boolean checkCrc;
+
+ /** Check that index size and cache size are same. */
+ private final boolean checkSizes;
/** Counter of processed partitions. */
private final AtomicInteger processedPartitions = new AtomicInteger(0);
@@ -126,6 +137,12 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
/** Counter of integrity checked indexes. */
private final AtomicInteger integrityCheckedIndexes = new AtomicInteger(0);
+ /** Counter of calculated sizes of caches per partitions. */
+ private final AtomicInteger processedCacheSizePartitions = new
AtomicInteger(0);
+
+ /** Counter of calculated index sizes. */
+ private final AtomicInteger processedIdxSizes = new AtomicInteger(0);
+
/** Total partitions. */
private volatile int totalIndexes;
@@ -138,17 +155,24 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
/** Calculation executor. */
private volatile ExecutorService calcExecutor;
+ /** Group cache ids when calculating cache size was an error. */
+ private final Set<Integer> failCalcCacheSizeGrpIds = newSetFromMap(new
ConcurrentHashMap<>());
+
/**
+ * Constructor.
+ *
* @param cacheNames Cache names.
* @param checkFirst If positive only first K elements will be validated.
* @param checkThrough If positive only each Kth element will be validated.
* @param checkCrc Check CRC sum on stored pages on disk.
+ * @param checkSizes Check that index size and cache size are same.
*/
- public ValidateIndexesClosure(Set<String> cacheNames, int checkFirst, int
checkThrough, boolean checkCrc) {
+ public ValidateIndexesClosure(Set<String> cacheNames, int checkFirst, int
checkThrough, boolean checkCrc, boolean checkSizes) {
this.cacheNames = cacheNames;
this.checkFirst = checkFirst;
this.checkThrough = checkThrough;
this.checkCrc = checkCrc;
+ this.checkSizes = checkSizes;
}
/** {@inheritDoc} */
@@ -206,6 +230,9 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
List<Future<Map<PartitionKey, ValidateIndexesPartitionResult>>>
procPartFutures = new ArrayList<>();
List<Future<Map<String, ValidateIndexesPartitionResult>>>
procIdxFutures = new ArrayList<>();
+ List<T3<CacheGroupContext, GridDhtLocalPartition, Future<CacheSize>>>
cacheSizeFutures = new ArrayList<>();
+ List<T3<GridCacheContext, Index, Future<T2<Throwable, Long>>>>
idxSizeFutures = new ArrayList<>();
+
List<T2<CacheGroupContext, GridDhtLocalPartition>> partArgs = new
ArrayList<>();
List<T2<GridCacheContext, Index>> idxArgs = new ArrayList<>();
@@ -213,37 +240,36 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults =
integrityCheckIndexesPartitions(grpIds);
+ GridQueryProcessor qryProcessor = ignite.context().query();
+ IgniteH2Indexing h2Indexing =
(IgniteH2Indexing)qryProcessor.getIndexing();
+
for (Integer grpId : grpIds) {
CacheGroupContext grpCtx =
ignite.context().cache().cacheGroup(grpId);
- if (grpCtx == null || integrityCheckResults.containsKey(grpId))
+ if (isNull(grpCtx) || integrityCheckResults.containsKey(grpId))
continue;
- List<GridDhtLocalPartition> parts =
grpCtx.topology().localPartitions();
-
- for (GridDhtLocalPartition part : parts)
+ for (GridDhtLocalPartition part :
grpCtx.topology().localPartitions())
partArgs.add(new T2<>(grpCtx, part));
- GridQueryProcessor qry = ignite.context().query();
-
- IgniteH2Indexing indexing = (IgniteH2Indexing)qry.getIndexing();
-
for (GridCacheContext ctx : grpCtx.caches()) {
- if (cacheNames == null || cacheNames.contains(ctx.name())) {
- Collection<GridQueryTypeDescriptor> types =
qry.types(ctx.name());
+ String cacheName = ctx.name();
- if (!F.isEmpty(types)) {
- for (GridQueryTypeDescriptor type : types) {
- GridH2Table gridH2Tbl =
indexing.schemaManager().dataTable(ctx.name(), type.tableName());
+ if (cacheNames == null || cacheNames.contains(cacheName)) {
+ Collection<GridQueryTypeDescriptor> types =
qryProcessor.types(cacheName);
- if (gridH2Tbl == null)
- continue;
+ if (F.isEmpty(types))
+ continue;
- ArrayList<Index> indexes = gridH2Tbl.getIndexes();
+ for (GridQueryTypeDescriptor type : types) {
+ GridH2Table gridH2Tbl =
h2Indexing.schemaManager().dataTable(cacheName, type.tableName());
- for (Index idx : indexes)
- if (idx instanceof H2TreeIndexBase)
- idxArgs.add(new T2<>(ctx, idx));
+ if (isNull(gridH2Tbl))
+ continue;
+
+ for (Index idx : gridH2Tbl.getIndexes()) {
+ if (idx instanceof H2TreeIndexBase)
+ idxArgs.add(new T2<>(ctx, idx));
}
}
}
@@ -251,8 +277,8 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
}
// To decrease contention on same indexes.
- Collections.shuffle(partArgs);
- Collections.shuffle(idxArgs);
+ shuffle(partArgs);
+ shuffle(idxArgs);
totalPartitions = partArgs.size();
totalIndexes = idxArgs.size();
@@ -263,11 +289,30 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
for (T2<GridCacheContext, Index> t2 : idxArgs)
procIdxFutures.add(processIndexAsync(t2.get1(), t2.get2()));
+ if (checkSizes) {
+ for (T2<CacheGroupContext, GridDhtLocalPartition> partArg :
partArgs) {
+ CacheGroupContext cacheGrpCtx = partArg.get1();
+ GridDhtLocalPartition locPart = partArg.get2();
+
+ cacheSizeFutures.add(new T3<>(cacheGrpCtx, locPart,
calcCacheSizeAsync(cacheGrpCtx, locPart)));
+ }
+
+ for (T2<GridCacheContext, Index> idxArg : idxArgs) {
+ GridCacheContext cacheCtx = idxArg.get1();
+ Index idx = idxArg.get2();
+
+ idxSizeFutures.add(new T3<>(cacheCtx, idx,
calcIndexSizeAsync(cacheCtx, idx)));
+ }
+ }
+
Map<PartitionKey, ValidateIndexesPartitionResult> partResults = new
HashMap<>();
Map<String, ValidateIndexesPartitionResult> idxResults = new
HashMap<>();
+ Map<String, ValidateIndexesCheckSizeResult> checkSizeResults = new
HashMap<>();
int curPart = 0;
int curIdx = 0;
+ int curCacheSize = 0;
+ int curIdxSize = 0;
try {
for (; curPart < procPartFutures.size(); curPart++) {
Future<Map<PartitionKey, ValidateIndexesPartitionResult>> fut
= procPartFutures.get(curPart);
@@ -287,6 +332,14 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
idxResults.putAll(idxRes);
}
+ for (; curCacheSize < cacheSizeFutures.size(); curCacheSize++)
+ cacheSizeFutures.get(curCacheSize).get3().get();
+
+ for (; curIdxSize < idxSizeFutures.size(); curIdxSize++)
+ idxSizeFutures.get(curIdxSize).get3().get();
+
+ checkSizes(cacheSizeFutures, idxSizeFutures, checkSizeResults);
+
log.warning("ValidateIndexesClosure finished: processed " +
totalPartitions + " partitions and "
+ totalIndexes + " indexes.");
}
@@ -297,10 +350,21 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
for (int j = curIdx; j < procIdxFutures.size(); j++)
procIdxFutures.get(j).cancel(false);
+ for (int j = curCacheSize; j < cacheSizeFutures.size(); j++)
+ cacheSizeFutures.get(j).get3().cancel(false);
+
+ for (int j = curIdxSize; j < idxSizeFutures.size(); j++)
+ idxSizeFutures.get(j).get3().cancel(false);
+
throw unwrapFutureException(e);
}
- return new VisorValidateIndexesJobResult(partResults, idxResults,
integrityCheckResults.values());
+ return new VisorValidateIndexesJobResult(
+ partResults,
+ idxResults,
+ integrityCheckResults.values(),
+ checkSizeResults
+ );
}
/**
@@ -343,7 +407,7 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
}
if (checkCrc) {
- for (Integer grpId: grpIds) {
+ for (Integer grpId : grpIds) {
final CacheGroupContext grpCtx =
ignite.context().cache().cacheGroup(grpId);
if (grpCtx == null || !grpCtx.persistenceEnabled()) {
@@ -353,13 +417,13 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
}
Future<T2<Integer, IndexIntegrityCheckIssue>> checkFut =
- calcExecutor.submit(new Callable<T2<Integer,
IndexIntegrityCheckIssue>>() {
- @Override public T2<Integer,
IndexIntegrityCheckIssue> call() throws Exception {
- IndexIntegrityCheckIssue issue =
integrityCheckIndexPartition(grpCtx, cpFlag);
+ calcExecutor.submit(new Callable<T2<Integer,
IndexIntegrityCheckIssue>>() {
+ @Override public T2<Integer,
IndexIntegrityCheckIssue> call() throws Exception {
+ IndexIntegrityCheckIssue issue =
integrityCheckIndexPartition(grpCtx, cpFlag);
- return new T2<>(grpCtx.groupId(), issue);
- }
- });
+ return new T2<>(grpCtx.groupId(), issue);
+ }
+ });
integrityCheckFutures.add(checkFut);
}
@@ -412,7 +476,7 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
finally {
integrityCheckedIndexes.incrementAndGet();
- printProgressIfNeeded("Current progress of ValidateIndexesClosure:
checked integrity of "
+ printProgressIfNeeded(() -> "Current progress of
ValidateIndexesClosure: checked integrity of "
+ integrityCheckedIndexes.get() + " index partitions of "
+ totalCacheGrps + " cache groups");
}
}
@@ -441,13 +505,13 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
GridDhtLocalPartition part
) {
if (!part.reserve())
- return Collections.emptyMap();
+ return emptyMap();
ValidateIndexesPartitionResult partRes;
try {
- if (part.state() != GridDhtPartitionState.OWNING)
- return Collections.emptyMap();
+ if (part.state() != OWNING)
+ return emptyMap();
long updateCntrBefore = part.updateCounter();
@@ -465,19 +529,6 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
GridQueryProcessor qryProcessor = ignite.context().query();
- Method m;
- try {
- m = GridQueryProcessor.class.getDeclaredMethod("typeByValue",
String.class,
- CacheObjectContext.class, KeyCacheObject.class,
CacheObject.class, boolean.class);
- }
- catch (NoSuchMethodException e) {
- log.error("Failed to invoke typeByValue", e);
-
- throw new IgniteException(e);
- }
-
- m.setAccessible(true);
-
final boolean skipConditions = checkFirst > 0 || checkThrough > 0;
final boolean bothSkipConditions = checkFirst > 0 && checkThrough
> 0;
@@ -532,68 +583,59 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
continue;
}
- try {
- QueryTypeDescriptorImpl res =
(QueryTypeDescriptorImpl)m.invoke(
- qryProcessor, cacheCtx.name(),
cacheCtx.cacheObjectContext(), row.key(), row.value(), true);
-
- if (res == null)
- continue; // Tolerate - (k, v) is just not indexed.
+ QueryTypeDescriptorImpl res = qryProcessor.typeByValue(
+ cacheCtx.name(),
+ cacheCtx.cacheObjectContext(),
+ row.key(),
+ row.value(),
+ true
+ );
- IgniteH2Indexing indexing =
(IgniteH2Indexing)qryProcessor.getIndexing();
+ if (res == null)
+ continue; // Tolerate - (k, v) is just not indexed.
- GridH2Table gridH2Tbl =
indexing.schemaManager().dataTable(cacheCtx.name(), res.tableName());
+ IgniteH2Indexing indexing =
(IgniteH2Indexing)qryProcessor.getIndexing();
- if (gridH2Tbl == null)
- continue; // Tolerate - (k, v) is just not indexed.
+ GridH2Table gridH2Tbl =
indexing.schemaManager().dataTable(cacheCtx.name(), res.tableName());
- GridH2RowDescriptor gridH2RowDesc =
gridH2Tbl.rowDescriptor();
+ if (gridH2Tbl == null)
+ continue; // Tolerate - (k, v) is just not indexed.
- H2CacheRow h2Row = gridH2RowDesc.createRow(row);
+ GridH2RowDescriptor gridH2RowDesc = gridH2Tbl.rowDescriptor();
- ArrayList<Index> indexes = gridH2Tbl.getIndexes();
-
- for (Index idx : indexes) {
- if (!(idx instanceof H2TreeIndexBase))
- continue;
+ H2CacheRow h2Row = gridH2RowDesc.createRow(row);
- try {
- Cursor cursor = idx.find((Session) null, h2Row,
h2Row);
+ ArrayList<Index> indexes = gridH2Tbl.getIndexes();
- if (cursor == null || !cursor.next())
- throw new IgniteCheckedException("Key is
present in CacheDataTree, but can't be found in SQL index.");
- }
- catch (Throwable t) {
- Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
- grpCtx.cacheObjectContext(), row.key(), true,
true);
-
- IndexValidationIssue is = new IndexValidationIssue(
- o.toString(), cacheCtx.name(), idx.getName(),
t);
+ for (Index idx : indexes) {
+ if (!(idx instanceof H2TreeIndexBase))
+ continue;
- log.error("Failed to lookup key: " +
is.toString(), t);
+ try {
+ Cursor cursor = idx.find((Session)null, h2Row, h2Row);
- enoughIssues |= partRes.reportIssue(is);
- }
+ if (cursor == null || !cursor.next())
+ throw new IgniteCheckedException("Key is present
in CacheDataTree, but can't be found in SQL index.");
}
- }
- catch (IllegalAccessException e) {
- log.error("Failed to invoke typeByValue", e);
+ catch (Throwable t) {
+ Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
+ grpCtx.cacheObjectContext(), row.key(), true,
true);
- throw new IgniteException(e);
- }
- catch (InvocationTargetException e) {
- Throwable target = e.getTargetException();
+ IndexValidationIssue is = new IndexValidationIssue(
+ o.toString(), cacheCtx.name(), idx.getName(), t);
- log.error("Failed to invoke typeByValue", target);
+ log.error("Failed to lookup key: " + is.toString(), t);
- throw new IgniteException(target);
+ enoughIssues |= partRes.reportIssue(is);
+ }
}
}
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to process partition [grpId=" +
grpCtx.groupId() +
+ error(log, "Failed to process partition [grpId=" +
grpCtx.groupId() +
", partId=" + part.id() + "]", e);
- return Collections.emptyMap();
+ return emptyMap();
}
finally {
part.release();
@@ -612,20 +654,23 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
*
*/
private void printProgressOfIndexValidationIfNeeded() {
- printProgressIfNeeded("Current progress of ValidateIndexesClosure:
processed " +
- processedPartitions.get() + " of " + totalPartitions + "
partitions, " +
- processedIndexes.get() + " of " + totalIndexes + " SQL
indexes");
+ printProgressIfNeeded(() -> "Current progress of
ValidateIndexesClosure: processed " +
+ processedPartitions.get() + " of " + totalPartitions + "
partitions, " +
+ processedIndexes.get() + " of " + totalIndexes + " SQL indexes" +
+ (checkSizes ? ", " + processedCacheSizePartitions.get() + " of " +
totalPartitions +
+ " calculate cache size per partitions, " +
processedIdxSizes.get() + " of " + totalIndexes +
+ "calculate index size" : ""));
}
/**
*
*/
- private void printProgressIfNeeded(String msg) {
+ private void printProgressIfNeeded(Supplier<String> msgSup) {
long curTs = U.currentTimeMillis();
long lastTs = lastProgressPrintTs.get();
if (curTs - lastTs >= 60_000 &&
lastProgressPrintTs.compareAndSet(lastTs, curTs))
- log.warning(msg);
+ log.warning(msgSup.get());
}
/**
@@ -780,4 +825,277 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
else
return new IgniteException(e.getCause());
}
+
+ /**
+ * Asynchronous calculation of caches size with divided by tables.
+ *
+ * @param grpCtx Cache group context.
+ * @param locPart Local partition.
+ * @return Future with cache sizes.
+ */
+ private Future<CacheSize> calcCacheSizeAsync(
+ CacheGroupContext grpCtx,
+ GridDhtLocalPartition locPart
+ ) {
+ return calcExecutor.submit(() -> {
+ try {
+ int grpId = grpCtx.groupId();
+
+ if (failCalcCacheSizeGrpIds.contains(grpId))
+ return new CacheSize(null, null);
+
+ boolean reserve = false;
+
+ int partId = locPart.id();
+
+ try {
+ if (!(reserve = locPart.reserve()))
+ throw new IgniteException("Can't reserve partition");
+
+ if (locPart.state() != OWNING)
+ throw new IgniteException("Partition not in state " +
OWNING);
+
+ Map<Integer, Map<String, AtomicLong>> cacheSizeByTbl = new
HashMap<>();
+
+ GridIterator<CacheDataRow> partIter =
grpCtx.offheap().partitionIterator(partId);
+
+ GridQueryProcessor qryProcessor = ignite.context().query();
+ IgniteH2Indexing h2Indexing =
(IgniteH2Indexing)qryProcessor.getIndexing();
+
+ while (partIter.hasNextX() &&
!failCalcCacheSizeGrpIds.contains(grpId)) {
+ CacheDataRow cacheDataRow = partIter.nextX();
+
+ int cacheId = cacheDataRow.cacheId();
+
+ GridCacheContext cacheCtx = cacheId == 0 ?
+ grpCtx.singleCacheContext() :
grpCtx.shared().cacheContext(cacheId);
+
+ if (cacheCtx == null)
+ throw new IgniteException("Unknown cacheId of
CacheDataRow: " + cacheId);
+
+ if (cacheDataRow.link() == 0L)
+ throw new IgniteException("Contains invalid
partition row, possibly deleted");
+
+ String cacheName = cacheCtx.name();
+
+ QueryTypeDescriptorImpl qryTypeDesc =
qryProcessor.typeByValue(
+ cacheName,
+ cacheCtx.cacheObjectContext(),
+ cacheDataRow.key(),
+ cacheDataRow.value(),
+ true
+ );
+
+ if (isNull(qryTypeDesc))
+ continue; // Tolerate - (k, v) is just not indexed.
+
+ String tableName = qryTypeDesc.tableName();
+
+ GridH2Table gridH2Tbl =
h2Indexing.schemaManager().dataTable(cacheName, tableName);
+
+ if (isNull(gridH2Tbl))
+ continue; // Tolerate - (k, v) is just not indexed.
+
+ cacheSizeByTbl.computeIfAbsent(cacheCtx.cacheId(), i
-> new HashMap<>())
+ .computeIfAbsent(tableName, s -> new
AtomicLong()).incrementAndGet();
+ }
+
+ return new CacheSize(null, cacheSizeByTbl);
+ }
+ catch (Throwable t) {
+ IgniteException cacheSizeErr = new IgniteException("Cache
size calculation error [" +
+ cacheGrpInfo(grpCtx) + ", locParId=" + partId + ",
err=" + t.getMessage() + "]", t);
+
+ error(log, cacheSizeErr);
+
+ failCalcCacheSizeGrpIds.add(grpId);
+
+ return new CacheSize(cacheSizeErr, null);
+ }
+ finally {
+ if (reserve)
+ locPart.release();
+ }
+ }
+ finally {
+ processedCacheSizePartitions.incrementAndGet();
+
+ printProgressOfIndexValidationIfNeeded();
+ }
+ });
+ }
+
+ /**
+ * Asynchronous calculation of the index size for cache.
+ *
+ * @param cacheCtx Cache context.
+ * @param idx Index.
+ * @return Future with index size.
+ */
+ private Future<T2<Throwable, Long>> calcIndexSizeAsync(
+ GridCacheContext cacheCtx,
+ Index idx
+ ) {
+ return calcExecutor.submit(() -> {
+ try {
+ if (failCalcCacheSizeGrpIds.contains(cacheCtx.groupId()))
+ return new T2<>(null, 0L);
+
+ String cacheName = cacheCtx.name();
+ String tblName = idx.getTable().getName();
+ String idxName = idx.getName();
+
+ try {
+ long indexSize =
ignite.context().query().getIndexing().indexSize(cacheName, tblName, idxName);
+ return new T2<>(null, indexSize);
+ }
+ catch (Throwable t) {
+ Throwable idxSizeErr = new IgniteException("Index size
calculation error [" +
+ cacheGrpInfo(cacheCtx.group()) + ", " +
cacheInfo(cacheCtx) + ", tableName=" +
+ tblName + ", idxName=" + idxName + ", err=" +
t.getMessage() + "]", t);
+
+ error(log, idxSizeErr);
+
+ return new T2<>(idxSizeErr, 0L);
+ }
+ }
+ finally {
+ processedIdxSizes.incrementAndGet();
+
+ printProgressOfIndexValidationIfNeeded();
+ }
+ });
+ }
+
+ /**
+ * Return cache group info string.
+ *
+ * @param cacheGrpCtx Cache group context.
+ * @return Cache group info string.
+ */
+ private String cacheGrpInfo(CacheGroupContext cacheGrpCtx) {
+ return "cacheGrpName=" + cacheGrpCtx.name() + ", cacheGrpId=" +
cacheGrpCtx.groupId();
+ }
+
+ /**
+ * Return cache info string.
+ *
+ * @param cacheCtx Cache context.
+ * @return Cache info string.
+ */
+ private String cacheInfo(GridCacheContext cacheCtx) {
+ return "cacheName=" + cacheCtx.name() + ", cacheId=" +
cacheCtx.cacheId();
+ }
+
+ /**
+ * Checking size of records in cache and indexes with a record into
+ * {@code checkSizeRes} if they are not equal.
+ *
+ * @param cacheSizesFutures Futures calculating size of records in caches.
+ * @param idxSizeFutures Futures calculating size of indexes of caches.
+ * @param checkSizeRes Result of size check.
+ */
+ private void checkSizes(
+ List<T3<CacheGroupContext, GridDhtLocalPartition, Future<CacheSize>>>
cacheSizesFutures,
+ List<T3<GridCacheContext, Index, Future<T2<Throwable, Long>>>>
idxSizeFutures,
+ Map<String, ValidateIndexesCheckSizeResult> checkSizeRes
+ ) throws ExecutionException, InterruptedException {
+ if (!checkSizes)
+ return;
+
+ Map<Integer, CacheSize> cacheSizeTotal = new HashMap<>();
+
+ for (T3<CacheGroupContext, GridDhtLocalPartition, Future<CacheSize>>
cacheSizeFut : cacheSizesFutures) {
+ CacheGroupContext cacheGrpCtx = cacheSizeFut.get1();
+ CacheSize cacheSize = cacheSizeFut.get3().get();
+
+ Throwable cacheSizeErr = cacheSize.err;
+
+ int grpId = cacheGrpCtx.groupId();
+
+ if (failCalcCacheSizeGrpIds.contains(grpId) &&
nonNull(cacheSizeErr)) {
+ checkSizeRes.computeIfAbsent(
+ cacheGrpInfo(cacheGrpCtx),
+ s -> new ValidateIndexesCheckSizeResult(0, new
ArrayList<>())
+ ).issues().add(new ValidateIndexesCheckSizeIssue(null, 0,
cacheSizeErr));
+ } else {
+ cacheSizeTotal.computeIfAbsent(grpId, i -> new CacheSize(null,
new HashMap<>()))
+ .merge(cacheSize.cacheSizePerTbl);
+ }
+ }
+
+ for (T3<GridCacheContext, Index, Future<T2<Throwable, Long>>>
idxSizeFut : idxSizeFutures) {
+ GridCacheContext cacheCtx = idxSizeFut.get1();
+
+ int grpId = cacheCtx.groupId();
+
+ if (failCalcCacheSizeGrpIds.contains(grpId))
+ continue;
+
+ Index idx = idxSizeFut.get2();
+ String tblName = idx.getTable().getName();
+
+ AtomicLong cacheSizeObj = cacheSizeTotal.get(grpId).cacheSizePerTbl
+ .getOrDefault(cacheCtx.cacheId(), emptyMap()).get(tblName);
+
+ long cacheSizeByTbl = isNull(cacheSizeObj) ? 0L :
cacheSizeObj.get();
+
+ T2<Throwable, Long> idxSizeRes = idxSizeFut.get3().get();
+
+ Throwable err = idxSizeRes.get1();
+ long idxSize = idxSizeRes.get2();
+
+ if (isNull(err) && idxSize != cacheSizeByTbl)
+ err = new IgniteException("Cache and index size not same.");
+
+ if (nonNull(err)) {
+ checkSizeRes.computeIfAbsent(
+ "[" + cacheGrpInfo(cacheCtx.group()) + ", " +
cacheInfo(cacheCtx) + ", tableName=" + tblName + "]",
+ s -> new ValidateIndexesCheckSizeResult(cacheSizeByTbl,
new ArrayList<>()))
+ .issues().add(new
ValidateIndexesCheckSizeIssue(idx.getName(), idxSize, err));
+ }
+ }
+ }
+
+ /**
+ * Container class for calculating the size of cache, divided by tables.
+ */
+ private static class CacheSize {
+ /** Error calculating size of the cache. */
+ final Throwable err;
+
+ /** Table split cache size, {@code Map<CacheId, Map<TableName,
Size>>}. */
+ final Map<Integer, Map<String, AtomicLong>> cacheSizePerTbl;
+
+ /**
+ * Constructor.
+ *
+ * @param err Error calculating size of the cache.
+ * @param cacheSizePerTbl Table split cache size.
+ */
+ public CacheSize(
+ @Nullable Throwable err,
+ @Nullable Map<Integer, Map<String, AtomicLong>> cacheSizePerTbl
+ ) {
+ this.err = err;
+ this.cacheSizePerTbl = cacheSizePerTbl;
+ }
+
+ /**
+ * Merging cache sizes separated by tables.
+ *
+ * @param other Other table split cache size.
+ */
+ void merge(Map<Integer, Map<String, AtomicLong>> other) {
+ assert nonNull(cacheSizePerTbl);
+
+ for (Entry<Integer, Map<String, AtomicLong>> cacheEntry :
other.entrySet()) {
+ for (Entry<String, AtomicLong> tableEntry :
cacheEntry.getValue().entrySet()) {
+ cacheSizePerTbl.computeIfAbsent(cacheEntry.getKey(), i ->
new HashMap<>())
+ .computeIfAbsent(tableEntry.getKey(), s -> new
AtomicLong())
+ .addAndGet(tableEntry.getValue().get());
+ }
+ }
+ }
+ }
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
index dc5972f..08e85e5 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
@@ -109,7 +109,8 @@ public class VisorValidateIndexesTask extends
VisorMultiNodeTask<VisorValidateIn
arg.getCaches(),
arg.getCheckFirst(),
arg.getCheckThrough(),
- arg.сheckCrc()
+ arg.сheckCrc(),
+ arg.checkSizes()
);
ignite.context().resource().injectGeneric(clo);
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
index 7db43f5..4253f40 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
@@ -156,7 +156,7 @@ public class IndexingMultithreadedLoadContinuousRestartTest
extends GridCommonAb
forceCheckpoint();
// Validate indexes on start.
- ValidateIndexesClosure clo = new
ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false);
+ ValidateIndexesClosure clo = new
ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false, true);
ignite.context().resource().injectGeneric(clo);
VisorValidateIndexesJobResult res = clo.call();
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
index 97f6925..39d2b46 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
@@ -322,7 +322,7 @@ public class LongDestroyDurableBackgroundTaskTest extends
GridCommonAbstractTest
log.info("Doing indexes validation.");
VisorValidateIndexesTaskArg taskArg =
- new
VisorValidateIndexesTaskArg(Collections.singleton("SQL_PUBLIC_T"), nodeIds, 0,
1, true);
+ new
VisorValidateIndexesTaskArg(Collections.singleton("SQL_PUBLIC_T"), nodeIds, 0,
1, true, true);
VisorValidateIndexesTaskResult taskRes =
ignite.compute().execute(VisorValidateIndexesTask.class.getName(),
new VisorTaskArgument<>(nodeIds, taskArg, false));
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java
index bf2a3c0..b98bdf2 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java
@@ -180,7 +180,7 @@ public class RebuildIndexTest extends
GridCommonAbstractTest {
doSleep(500);
// Validate indexes on start.
- ValidateIndexesClosure clo = new
ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false);
+ ValidateIndexesClosure clo = new
ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false, true);
node.context().resource().injectGeneric(clo);
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
index a7e07e3..75590b8 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
@@ -229,7 +229,7 @@ public class RebuildIndexWithHistoricalRebalanceTest
extends GridCommonAbstractT
awaitPartitionMapExchange();
- ValidateIndexesClosure clo = new
ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false);
+ ValidateIndexesClosure clo = new
ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false, true);
node2.context().resource().injectGeneric(clo);
VisorValidateIndexesJobResult res = clo.call();
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexFullRebuildTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexFullRebuildTest.java
index 2baa378..fa221a2 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexFullRebuildTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexFullRebuildTest.java
@@ -206,7 +206,7 @@ public class GridIndexFullRebuildTest extends
GridCommonAbstractTest {
((IgniteProcessProxy)grid(2)).getId());
VisorValidateIndexesTaskArg arg = new VisorValidateIndexesTaskArg(null,
- null, 10000, 1, true);
+ null, 10000, 1, true, true);
VisorTaskArgument<VisorValidateIndexesTaskArg> argument = new
VisorTaskArgument<>(nodes, arg, true);
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingAndPersistenceTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingAndPersistenceTestSuite.java
index 5cac08a..309f784 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingAndPersistenceTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingAndPersistenceTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.cache.StartCachesInParallelTest;
import
org.apache.ignite.internal.processors.cache.index.IoStatisticsBasicIndexSelfTest;
import org.apache.ignite.util.GridCommandHandlerBrokenIndexTest;
+import org.apache.ignite.util.GridCommandHandlerIndexingCheckSizeTest;
import org.apache.ignite.util.GridCommandHandlerIndexingClusterByClassTest;
import
org.apache.ignite.util.GridCommandHandlerIndexingClusterByClassWithSSLTest;
import org.apache.ignite.util.GridCommandHandlerIndexingTest;
@@ -37,6 +38,7 @@ import org.junit.runners.Suite;
GridCommandHandlerIndexingWithSSLTest.class,
GridCommandHandlerIndexingClusterByClassTest.class,
GridCommandHandlerIndexingClusterByClassWithSSLTest.class,
+ GridCommandHandlerIndexingCheckSizeTest.class,
StartCachesInParallelTest.class,
IoStatisticsBasicIndexSelfTest.class
})
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingCheckSizeTest.java
b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingCheckSizeTest.java
new file mode 100644
index 0000000..2d853ed
--- /dev/null
+++
b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingCheckSizeTest.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.commandline.cache.CacheSubcommands;
+import
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesCheckSizeIssue;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesCheckSizeResult;
+import org.apache.ignite.internal.visor.verify.VisorValidateIndexesTaskResult;
+import org.apache.ignite.util.GridCommandHandlerIndexingUtils.Organization;
+import org.apache.ignite.util.GridCommandHandlerIndexingUtils.Person;
+import org.junit.Test;
+
+import static java.lang.String.valueOf;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
+import static java.util.Objects.requireNonNull;
+import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static org.apache.ignite.internal.commandline.CommandList.CACHE;
+import static
org.apache.ignite.internal.commandline.cache.CacheSubcommands.VALIDATE_INDEXES;
+import static
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_SIZES;
+import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static org.apache.ignite.testframework.GridTestUtils.assertNotContains;
+import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.CACHE_NAME;
+import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.GROUP_NAME;
+import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.breakCacheDataTree;
+import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.breakSqlIndex;
+import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillCache;
+import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.organizationEntity;
+import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.personEntity;
+
+/**
+ * Class for testing function of checking size of index and cache in
+ * {@link CacheSubcommands#VALIDATE_INDEXES}.
+ */
+public class GridCommandHandlerIndexingCheckSizeTest extends
GridCommandHandlerClusterByClassAbstractTest {
+ /** Entry count for entity. */
+ private static final int ENTRY_CNT = 100;
+
+ /** Non persistent data region name. */
+ private static final String NON_PERSIST_REGION = "non-persist";
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ createAndFillCache(client, CACHE_NAME, GROUP_NAME, null,
queryEntities(), ENTRY_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getDataStorageConfiguration().setDataRegionConfigurations(
+ new DataRegionConfiguration().setName(NON_PERSIST_REGION)
+ );
+
+ return cfg;
+ }
+
+ /**
+ * Test checks that an error will be displayed checking cache size and
+ * index when cache is broken.
+ */
+ @Test
+ public void testCheckCacheSizeWhenBrokenCache() {
+ validateCheckSizesAfterBreakCacheDataTree(crd, CACHE_NAME, ENTRY_CNT);
+ }
+
+ /**
+ * Test checks that cache size and index validation error will not be
+ * displayed if cache is broken, because argument
+ * {@link ValidateIndexesCommandArg#CHECK_SIZES} not used.
+ */
+ @Test
+ public void testNoCheckCacheSizeWhenBrokenCache() {
+ String cacheName = CACHE_NAME;
+
+ breakCacheDataTree(log, crd.cachex(cacheName), 1, null);
+
+ checkNoCheckSizeInCaseBrokenData(cacheName);
+ }
+
+ /**
+ * Test checks that an error will be displayed checking cache size and
+ * index when index is broken.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCheckCacheSizeWhenBrokenIdx() throws Exception {
+ validateCheckSizesAfterBreakSqlIndex(crd, CACHE_NAME, ENTRY_CNT);
+ }
+
+ /**
+ * Test checks that cache size and index validation error will not be
+ * displayed if index is broken, because argument
+ * {@link ValidateIndexesCommandArg#CHECK_SIZES} not used.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNoCheckCacheSizeWhenBrokenIdx() throws Exception {
+ String cacheName = CACHE_NAME;
+
+ breakSqlIndex(crd.cachex(cacheName), 1, null);
+
+ checkNoCheckSizeInCaseBrokenData(cacheName);
+ }
+
+ /**
+ * Test that checks that there will be no errors when executing command
+ * "validate_indexes" with/without "--check-sizes" on the cache without
+ * {@link QueryEntity}.
+ */
+ @Test
+ public void testNoErrorOnCacheWithoutQueryEntity() {
+ String cacheName = DEFAULT_CACHE_NAME;
+
+ createAndFillCache(crd, cacheName, GROUP_NAME, null, emptyMap(), 0);
+
+ try (IgniteDataStreamer<Object, Object> streamer =
crd.dataStreamer(cacheName)) {
+ for (int i = 0; i < ENTRY_CNT; i++)
+ streamer.addData(i, new Person(i, "p_" + i));
+
+ streamer.flush();
+ }
+
+ execVIWithNoErrCheck(cacheName, false);
+ execVIWithNoErrCheck(cacheName, true);
+ }
+
+ /**
+ * Test that checks that there will be no errors when executing command
+ * "validate_indexes" with/without "--check-sizes" on the empty cache
+ * with {@link QueryEntity}.
+ */
+ @Test
+ public void testNoErrorOnEmptyCacheWithQueryEntity() {
+ String cacheName = DEFAULT_CACHE_NAME;
+
+ createAndFillCache(crd, cacheName, GROUP_NAME, null, queryEntities(),
0);
+
+ execVIWithNoErrCheck(cacheName, false);
+ execVIWithNoErrCheck(cacheName, true);
+ }
+
+ /**
+ * Test checks that there will be no errors if there are entries without
+ * {@link QueryEntity} in cache.
+ */
+ @Test
+ public void testNoErrorOnCacheWithEntryWithoutQueryEntity() {
+ String cacheName = CACHE_NAME;
+
+ int cacheSize = crd.cachex(cacheName).size();
+
+ try (IgniteDataStreamer<Object, Object> streamer =
crd.dataStreamer(cacheName)) {
+ for (int i = cacheSize; i < cacheSize + ENTRY_CNT; i++)
+ streamer.addData(i, i);
+
+ streamer.flush();
+ }
+
+ execVIWithNoErrCheck(cacheName, false);
+ execVIWithNoErrCheck(cacheName, true);
+ }
+
+ /**
+ * Test checks that there will be no errors if there are entries without
+ * {@link QueryEntity} in cache, and also if there are null values.
+ */
+ @Test
+ public void
testNoErrorOnCacheWithEntryWithoutQueryEntityAndWithNullValues() {
+ String cacheName = CACHE_NAME;
+
+ int cacheSize = crd.cachex(cacheName).size();
+
+ try (IgniteDataStreamer<Object, Object> streamer =
crd.dataStreamer(cacheName)) {
+ int i = cacheSize;
+
+ for (; i < cacheSize + (ENTRY_CNT - 10); i++)
+ streamer.addData(i, i);
+
+ for (; i < cacheSize + ENTRY_CNT; i++)
+ streamer.addData(i, null);
+
+ streamer.flush();
+ }
+
+ execVIWithNoErrCheck(cacheName, false);
+ execVIWithNoErrCheck(cacheName, true);
+ }
+
+ /**
+ * Test checks that an error will be displayed checking cache size and
+ * index when cache is broken. In {@link #NON_PERSIST_REGION} region.
+ */
+ @Test
+ public void testCheckCacheSizeWhenBrokenCacheInNonPersistRegion() {
+ IgniteEx node = crd;
+ String cacheName = CACHE_NAME + "_new";
+
+ createAndFillCache(node, cacheName, GROUP_NAME + "_new",
NON_PERSIST_REGION, queryEntities(), ENTRY_CNT);
+
+ validateCheckSizesAfterBreakCacheDataTree(node, cacheName, ENTRY_CNT);
+ }
+
+ /**
+ * Test checks that an error will be displayed checking cache size and
+ * index when index is broken. In {@link #NON_PERSIST_REGION} region.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCheckCacheSizeWhenBrokenIdxInNonPersistRegion() throws
Exception {
+ IgniteEx node = crd;
+ String cacheName = CACHE_NAME + "_new";
+
+ createAndFillCache(node, cacheName, GROUP_NAME + "_new",
NON_PERSIST_REGION, queryEntities(), ENTRY_CNT);
+
+ validateCheckSizesAfterBreakSqlIndex(node, cacheName, ENTRY_CNT);
+ }
+
+ /**
+ * Test checks that an error will be displayed checking cache size and
+ * index when cache is broken. In case of dynamic add a column and index.
+ */
+ @Test
+ public void
testCheckCacheSizeWhenBrokenCacheWithDynamicAddColumnAndIndex() {
+ IgniteEx node = crd;
+ String cacheName = CACHE_NAME;
+
+ int addCnt = ENTRY_CNT;
+ addColumnAndIdx(node, cacheName, addCnt);
+
+ validateCheckSizesAfterBreakCacheDataTree(node, cacheName, ENTRY_CNT +
addCnt);
+ }
+
+ /**
+ * Test checks that an error will be displayed checking cache size and
+ * index when index is broken. In case of dynamic add a column and index.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCheckCacheSizeWhenBrokenIdxWithDynamicAddColumnAndIndex()
throws Exception {
+ IgniteEx node = crd;
+ String cacheName = CACHE_NAME;
+
+ int addCnt = ENTRY_CNT;
+ addColumnAndIdx(node, cacheName, addCnt);
+
+ validateCheckSizesAfterBreakSqlIndex(node, cacheName, ENTRY_CNT +
addCnt);
+ }
+
+ /**
+ * Test that checks that there will be no errors when executing command
+ * "validate_indexes" with/without "--check-sizes" on cache with
+ * {@link QueryEntity}.
+ */
+ @Test
+ public void testNoErrorOnCacheWithQueryEntity() {
+ String cacheName = CACHE_NAME;
+
+ execVIWithNoErrCheck(cacheName, false);
+ execVIWithNoErrCheck(cacheName, true);
+ }
+
+ /**
+ * Adding the "address" column and index for {@link Person} and
+ * {@link Organization}, with new entries added for each of them.
+ *
+ * @param node Node.
+ * @param cacheName Cache name.
+ * @param addCnt How many entries add to table.
+ * */
+ private void addColumnAndIdx(IgniteEx node, String cacheName, int addCnt) {
+ IgniteCache<Object, Object> cache = node.cache(cacheName);
+
+ cache.query(new SqlFieldsQuery("alter table Person add column orgAddr
varchar")).getAll();
+ cache.query(new SqlFieldsQuery("alter table Organization add column
addr varchar")).getAll();
+
+ cache.query(new SqlFieldsQuery("create index p_o_addr on Person
(orgAddr)")).getAll();
+ cache.query(new SqlFieldsQuery("create index o_addr on Organization
(addr)")).getAll();
+
+ int key = node.cachex(cacheName).size();
+
+ try (IgniteDataStreamer<Object, Object> streamer =
node.dataStreamer(cacheName)) {
+ ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+ for (int i = 0; i < addCnt; i++) {
+ streamer.addData(
+ key++,
+ new Person(rand.nextInt(),
valueOf(rand.nextLong())).orgAddr(valueOf(rand.nextLong()))
+ );
+
+ streamer.addData(
+ key++,
+ new Organization(rand.nextInt(),
valueOf(rand.nextLong())).addr(valueOf(rand.nextLong()))
+ );
+ }
+
+ streamer.flush();
+ }
+ }
+
+ /**
+ * Validation of cache and index sizes after breaking sql index.
+ *
+ * @param node Node.
+ * @param cacheName Cache name.
+ * @param entryCnt Entry count for table.
+ */
+ private void validateCheckSizesAfterBreakSqlIndex(IgniteEx node, String
cacheName, int entryCnt) throws Exception {
+ Map<String, AtomicInteger> rmvIdxByTbl = new HashMap<>();
+
+ breakSqlIndex(node.cachex(cacheName), 0, row -> {
+ rmvIdxByTbl.computeIfAbsent(tableName(cacheName, row), s -> new
AtomicInteger()).incrementAndGet();
+ return true;
+ });
+
+ assertEquals(rmvIdxByTbl.size(), queryEntities().size());
+ validateCheckSizes(node, cacheName, rmvIdxByTbl, ai -> entryCnt, ai ->
entryCnt - ai.get());
+ }
+
+ /**
+ * Validation of cache and index sizes after breaking CacheDataTree.
+ *
+ * @param node Node.
+ * @param cacheName Cache name.
+ * @param entryCnt Entry count for table.
+ */
+ private void validateCheckSizesAfterBreakCacheDataTree(IgniteEx node,
String cacheName, int entryCnt) {
+ requireNonNull(cacheName);
+ requireNonNull(node);
+
+ Map<String, AtomicInteger> rmvEntryByTbl = new HashMap<>();
+
+ breakCacheDataTree(log, node.cachex(cacheName), 1, (i, entry) -> {
+ rmvEntryByTbl.computeIfAbsent(tableName(cacheName, entry), s ->
new AtomicInteger()).incrementAndGet();
+ return true;
+ });
+
+ assertEquals(rmvEntryByTbl.size(), queryEntities().size());
+ validateCheckSizes(node, cacheName, rmvEntryByTbl, ai -> entryCnt -
ai.get(), ai -> entryCnt);
+ }
+
+ /**
+ * Creating {@link QueryEntity}'s with filling functions.
+ *
+ * @return {@link QueryEntity}'s with filling functions.
+ */
+ private Map<QueryEntity, Function<Random, Object>> queryEntities() {
+ Map<QueryEntity, Function<Random, Object>> qryEntities = new
HashMap<>();
+
+ qryEntities.put(personEntity(), rand -> new Person(rand.nextInt(),
valueOf(rand.nextLong())));
+ qryEntities.put(organizationEntity(), rand -> new
Organization(rand.nextInt(), valueOf(rand.nextLong())));
+
+ return qryEntities;
+ }
+
+ /**
+ * Executing "validate_indexes" command with verify that there are
+ * no errors in result.
+ *
+ * @param cacheName Cache name.
+ * @param checkSizes Add argument "--check-sizes".
+ */
+ private void execVIWithNoErrCheck(String cacheName, boolean checkSizes) {
+ List<String> cmdWithArgs = new ArrayList<>(asList(CACHE.text(),
VALIDATE_INDEXES.text(), cacheName));
+
+ if (checkSizes)
+ cmdWithArgs.add(CHECK_SIZES.argName());
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_OK, execute(cmdWithArgs));
+
+ String out = testOut.toString();
+ assertNotContains(log, out, "issues found (listed above)");
+ assertNotContains(log, out, "Size check");
+ }
+
+ /**
+ * Check that if data is broken and option
+ * {@link ValidateIndexesCommandArg#CHECK_SIZES} is enabled, size check
+ * will not take place.
+ *
+ * @param cacheName Cache size.
+ */
+ private void checkNoCheckSizeInCaseBrokenData(String cacheName) {
+ injectTestSystemOut();
+
+ assertEquals(
+ EXIT_CODE_OK,
+ execute(CACHE.text(), VALIDATE_INDEXES.text(), cacheName)
+ );
+
+ String out = testOut.toString();
+ assertContains(log, out, "issues found (listed above)");
+ assertNotContains(log, out, "Size check");
+ }
+
+ /**
+ * Checking whether cache and index size check is correct.
+ *
+ * @param node Node.
+ * @param cacheName Cache name.
+ * @param rmvByTbl Number of deleted items per table.
+ * @param cacheSizeExp Function for getting expected cache size.
+ * @param idxSizeExp Function for getting expected index size.
+ */
+ private void validateCheckSizes(
+ IgniteEx node,
+ String cacheName,
+ Map<String, AtomicInteger> rmvByTbl,
+ Function<AtomicInteger, Integer> cacheSizeExp,
+ Function<AtomicInteger, Integer> idxSizeExp
+ ) {
+ requireNonNull(node);
+ requireNonNull(cacheName);
+ requireNonNull(rmvByTbl);
+ requireNonNull(cacheSizeExp);
+ requireNonNull(idxSizeExp);
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_OK, execute(CACHE.text(),
VALIDATE_INDEXES.text(), cacheName, CHECK_SIZES.argName()));
+
+ String out = testOut.toString();
+ assertContains(log, out, "issues found (listed above)");
+ assertContains(log, out, "Size check");
+
+ Map<String, ValidateIndexesCheckSizeResult> valIdxCheckSizeResults =
+
((VisorValidateIndexesTaskResult)lastOperationResult).results().get(node.localNode().id())
+ .checkSizeResult();
+
+ assertEquals(rmvByTbl.size(), valIdxCheckSizeResults.size());
+
+ for (Map.Entry<String, AtomicInteger> rmvByTblEntry :
rmvByTbl.entrySet()) {
+ ValidateIndexesCheckSizeResult checkSizeRes =
valIdxCheckSizeResults.entrySet().stream()
+ .filter(e -> e.getKey().contains(rmvByTblEntry.getKey()))
+ .map(Map.Entry::getValue)
+ .findAny()
+ .orElse(null);
+
+ assertNotNull(checkSizeRes);
+ assertEquals((int)cacheSizeExp.apply(rmvByTblEntry.getValue()),
checkSizeRes.cacheSize());
+
+ Collection<ValidateIndexesCheckSizeIssue> issues =
checkSizeRes.issues();
+ assertFalse(issues.isEmpty());
+
+ issues.forEach(issue -> {
+ assertEquals((int)idxSizeExp.apply(rmvByTblEntry.getValue()),
issue.indexSize());
+
+ Throwable err = issue.error();
+ assertNotNull(err);
+ assertEquals("Cache and index size not same.",
err.getMessage());
+ });
+ }
+ }
+
+ /**
+ * Get table name for cache row.
+ *
+ * @param cacheName Cache name.
+ * @param cacheDataRow Cache row.
+ */
+ private String tableName(String cacheName, CacheDataRow cacheDataRow) {
+ requireNonNull(cacheName);
+ requireNonNull(cacheDataRow);
+
+ try {
+ return crd.context().query().typeByValue(
+ cacheName,
+ crd.cachex(cacheName).context().cacheObjectContext(),
+ cacheDataRow.key(),
+ cacheDataRow.value(),
+ false
+ ).tableName();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Get table name for cache entry.
+ *
+ * @param cacheName Cache name.
+ * @param cacheEntry Cache entry.
+ */
+ private <K, V> String tableName(String cacheName, Cache.Entry<K, V>
cacheEntry) {
+ requireNonNull(cacheName);
+ requireNonNull(cacheEntry);
+
+ try {
+ return crd.context().query().typeByValue(
+ cacheName,
+ crd.cachex(cacheName).context().cacheObjectContext(),
+ null,
+ ((CacheObject)cacheEntry.getValue()),
+ false
+ ).tableName();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingClusterByClassTest.java
b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingClusterByClassTest.java
index 66addd2..4b2b8ab 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingClusterByClassTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingClusterByClassTest.java
@@ -17,30 +17,15 @@
package org.apache.ignite.util;
-import java.util.Iterator;
-import javax.cache.Cache;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.query.ScanQuery;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
-import org.apache.ignite.internal.processors.cache.tree.SearchRow;
-import org.apache.ignite.internal.processors.query.GridQueryProcessor;
-import org.apache.ignite.internal.util.lang.GridIterator;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Test;
import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.breakCacheDataTree;
+import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.breakSqlIndex;
+import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillCache;
import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.CACHE_NAME;
import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.GROUP_NAME;
-import static
org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillCache;
/**
* You can use this class if you don't need create nodes for each test because
@@ -97,7 +82,7 @@ public class GridCommandHandlerIndexingClusterByClassTest
extends GridCommandHan
*/
@Test
public void testBrokenCacheDataTreeShouldFailValidation() {
- breakCacheDataTree(crd, CACHE_NAME, 1);
+ breakCacheDataTreeOnCrd();
injectTestSystemOut();
@@ -122,7 +107,7 @@ public class GridCommandHandlerIndexingClusterByClassTest
extends GridCommandHan
*/
@Test
public void
testBrokenCacheDataTreeShouldFailValidationWithCacheGroupInfo() {
- breakCacheDataTree(crd, CACHE_NAME, 1);
+ breakCacheDataTreeOnCrd();
injectTestSystemOut();
@@ -140,7 +125,7 @@ public class GridCommandHandlerIndexingClusterByClassTest
extends GridCommandHan
*/
@Test
public void testBrokenSqlIndexShouldFailValidation() throws Exception {
- breakSqlIndex(crd, CACHE_NAME);
+ breakSqlIndexOnCrd();
injectTestSystemOut();
@@ -158,7 +143,7 @@ public class GridCommandHandlerIndexingClusterByClassTest
extends GridCommandHan
forceCheckpoint();
- breakCacheDataTree(crd, CACHE_NAME, 1);
+ breakCacheDataTreeOnCrd();
injectTestSystemOut();
@@ -178,100 +163,19 @@ public class
GridCommandHandlerIndexingClusterByClassTest extends GridCommandHan
}
/**
- * Removes some entries from a partition skipping index update. This
effectively breaks the index.
+ * Removes some entries from a partition skipping index update.
*/
- private void breakCacheDataTree(Ignite ig, String cacheName, int partId) {
- IgniteEx ig0 = (IgniteEx)ig;
- int cacheId = CU.cacheId(cacheName);
-
- ScanQuery scanQry = new ScanQuery(partId);
-
- GridCacheContext<Object, Object> ctx =
ig0.context().cache().context().cacheContext(cacheId);
-
- // Get current update counter
- String grpName =
ig0.context().cache().context().cacheContext(cacheId).config().getGroupName();
- int cacheGrpId = grpName == null ? cacheName.hashCode() :
grpName.hashCode();
-
- GridDhtLocalPartition locPart =
ctx.dht().topology().localPartition(partId);
- IgniteCacheOffheapManager.CacheDataStore dataStore =
ig0.context().cache().context().cache().cacheGroup(cacheGrpId).offheap().dataStore(locPart);
-
- Iterator<Cache.Entry> it =
ig.cache(cacheName).withKeepBinary().query(scanQry).iterator();
-
- for (int i = 0; i < 5_000; i++) {
- if (it.hasNext()) {
- Cache.Entry entry = it.next();
-
- if (i % 5 == 0) {
- // Do update
- GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)ig0.context().cache().context().database();
-
- db.checkpointReadLock();
-
- try {
- IgniteCacheOffheapManager.CacheDataStore innerStore =
U.field(dataStore, "delegate");
-
- // IgniteCacheOffheapManagerImpl.CacheDataRowStore
- Object rowStore = U.field(innerStore, "rowStore");
-
- // IgniteCacheOffheapManagerImpl.CacheDataTree
- Object dataTree = U.field(innerStore, "dataTree");
-
- CacheDataRow oldRow = U.invoke(
- dataTree.getClass(),
- dataTree,
- "remove",
- new SearchRow(cacheId,
ctx.toCacheKeyObject(entry.getKey())));
-
- if (oldRow != null)
- U.invoke(rowStore.getClass(), rowStore,
"removeRow", oldRow.link(), IoStatisticsHolderNoOp.INSTANCE);
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to remove key skipping indexes: " +
entry);
-
- e.printStackTrace();
- }
- finally {
- db.checkpointReadUnlock();
- }
- }
- }
- else {
- log.info("Early exit for index corruption, keys processed: " +
i);
-
- break;
- }
- }
+ private void breakCacheDataTreeOnCrd() {
+ breakCacheDataTree(log, crd.cachex(CACHE_NAME), 1, (i, entry) -> i % 5
== 0);
}
/**
- * Removes some entries from H2 trees skipping partition updates. This
effectively breaks the index.
+ * Removes some entries from H2 trees skipping partition updates.
+ * This effectively breaks the index.
+ *
+ * @throws Exception If failed.
*/
- private void breakSqlIndex(Ignite ig, String cacheName) throws Exception {
- GridQueryProcessor qry = ((IgniteEx)ig).context().query();
-
- GridCacheContext<Object, Object> ctx =
((IgniteEx)ig).cachex(cacheName).context();
-
- GridDhtLocalPartition locPart =
ctx.topology().localPartitions().get(0);
-
- GridIterator<CacheDataRow> it =
ctx.group().offheap().partitionIterator(locPart.id());
-
- for (int i = 0; i < 500; i++) {
- if (!it.hasNextX()) {
- log.info("Early exit for index corruption, keys processed: " +
i);
-
- break;
- }
-
- CacheDataRow row = it.nextX();
-
- ctx.shared().database().checkpointReadLock();
-
- try {
- qry.remove(ctx, row);
- }
- finally {
- ctx.shared().database().checkpointReadUnlock();
- }
- }
+ private void breakSqlIndexOnCrd() throws Exception {
+ breakSqlIndex(crd.cachex(CACHE_NAME), 0, null);
}
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingUtils.java
b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingUtils.java
index feb2a73..5d99425 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingUtils.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingUtils.java
@@ -18,22 +18,52 @@
package org.apache.ignite.util;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import javax.cache.Cache.Entry;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.internal.processors.cache.tree.SearchRow;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.jetbrains.annotations.Nullable;
import static java.lang.String.valueOf;
import static java.util.Arrays.asList;
+import static java.util.Collections.singletonMap;
import static java.util.Objects.nonNull;
+import static java.util.Objects.requireNonNull;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static
org.apache.ignite.internal.metric.IoStatisticsHolderNoOp.INSTANCE;
+import static org.apache.ignite.internal.util.IgniteUtils.field;
+import static org.apache.ignite.internal.util.IgniteUtils.hasField;
/**
* Utility class for tests.
@@ -51,7 +81,27 @@ public class GridCommandHandlerIndexingUtils {
}
/**
- * Create and fill cache. Key - integer, value - {@code Person}.
+ * Create and fill cache. Key - integer, value - {@link Person}.
+ * {@link #createAndFillCache(Ignite, String, String, String, Map, int)}
+ * is used with {@link #personEntity()}, default dataRegion and cnt=10_000.
+ *
+ * @param ignite Node.
+ * @param cacheName Cache name.
+ * @param grpName Group name.
+ */
+ public static void createAndFillCache(Ignite ignite, String cacheName,
String grpName) {
+ createAndFillCache(
+ ignite,
+ cacheName,
+ grpName,
+ null,
+ singletonMap(personEntity(), rand -> new Person(rand.nextInt(),
valueOf(rand.nextLong()))),
+ 10_000
+ );
+ }
+
+ /**
+ * Create and fill cache.
* <br/>
* <table class="doctable">
* <th>Cache parameter</th>
@@ -69,65 +119,196 @@ public class GridCommandHandlerIndexingUtils {
* <td>1</td>
* </tr>
* <tr>
- * <td>Query entities</td>
- * <td>{@link #personEntity()}</td>
- * </tr>
- * <tr>
* <td>Affinity</td>
* <td>{@link RendezvousAffinityFunction} with exclNeighbors = false,
parts = 32</td>
* </tr>
* </table>
*
- *
* @param ignite Node.
* @param cacheName Cache name.
* @param grpName Group name.
- * @see Person
- * */
- public static void createAndFillCache(final Ignite ignite, final String
cacheName, final String grpName) {
- assert nonNull(ignite);
- assert nonNull(cacheName);
- assert nonNull(grpName);
-
- ignite.createCache(new CacheConfiguration<Integer, Person>()
+ * @param dataRegionName DataRegionConfiguration name, null for default.
+ * @param qryEntities QueryEntities and functions for creating them.
+ * @param cnt How many entities create for each {@code qryEntities}.
+ */
+ public static void createAndFillCache(
+ Ignite ignite,
+ String cacheName,
+ String grpName,
+ @Nullable String dataRegionName,
+ Map<QueryEntity, Function<Random, Object>> qryEntities,
+ int cnt
+ ) {
+ requireNonNull(ignite);
+ requireNonNull(cacheName);
+ requireNonNull(grpName);
+ requireNonNull(qryEntities);
+
+ ignite.createCache(new CacheConfiguration<>()
.setName(cacheName)
.setGroupName(grpName)
+ .setDataRegionName(dataRegionName)
.setWriteSynchronizationMode(FULL_SYNC)
.setAtomicityMode(ATOMIC)
.setBackups(1)
- .setQueryEntities(F.asList(personEntity()))
+ .setQueryEntities(new ArrayList<>(qryEntities.keySet()))
.setAffinity(new RendezvousAffinityFunction(false, 32)));
ThreadLocalRandom rand = ThreadLocalRandom.current();
- try (IgniteDataStreamer<Integer, Person> streamer =
ignite.dataStreamer(cacheName)) {
- for (int i = 0; i < 10_000; i++)
- streamer.addData(i, new Person(rand.nextInt(),
valueOf(rand.nextLong())));
+ try (IgniteDataStreamer<Integer, Object> streamer =
ignite.dataStreamer(cacheName)) {
+ int entity = 0;
+ for (Function<Random, Object> fun : qryEntities.values()) {
+ for (int i = 0; i < cnt; i++)
+ streamer.addData(i + (entity * cnt), fun.apply(rand));
+
+ streamer.flush();
+ entity++;
+ }
+ }
+ }
+
+ /**
+ * Deleting a rows from the cache without updating indexes.
+ *
+ * @param log Logger.
+ * @param internalCache Cache.
+ * @param partId Partition number.
+ * @param filter Entry filter.
+ */
+ static <K, V> void breakCacheDataTree(
+ IgniteLogger log,
+ IgniteInternalCache<K, V> internalCache,
+ int partId,
+ @Nullable BiPredicate<Integer, Entry<K, V>> filter
+ ) {
+ requireNonNull(log);
+ requireNonNull(internalCache);
+
+ GridCacheContext<K, V> cacheCtx = internalCache.context();
+
+ GridDhtLocalPartition dhtLocPart =
cacheCtx.dht().topology().localPartition(partId);
+
+ CacheDataStore cacheDataStore =
cacheCtx.group().offheap().dataStore(dhtLocPart);
+
+ String delegate = "delegate";
+ if (hasField(cacheDataStore, delegate))
+ cacheDataStore = field(cacheDataStore, delegate);
+
+ CacheDataRowStore cacheDataRowStore = field(cacheDataStore,
"rowStore");
+ CacheDataTree cacheDataTree = field(cacheDataStore, "dataTree");
+
+ String cacheName = internalCache.name();
+
+ QueryCursor<Entry<K, V>> qryCursor =
cacheCtx.kernalContext().grid().cache(cacheName).withKeepBinary()
+ .query(new ScanQuery<>(partId));
+
+ Iterator<Entry<K, V>> cacheEntryIter = qryCursor.iterator();
+
+ IgniteCacheDatabaseSharedManager db = cacheCtx.shared().database();
+ int cacheId = CU.cacheId(cacheName);
+ int i = 0;
+
+ while (cacheEntryIter.hasNext()) {
+ Entry<K, V> entry = cacheEntryIter.next();
+
+ if (nonNull(filter) && !filter.test(i++, entry))
+ continue;
+
+ db.checkpointReadLock();
+
+ try {
+ CacheDataRow oldRow = cacheDataTree.remove(
+ new SearchRow(cacheId,
cacheCtx.toCacheKeyObject(entry.getKey()))
+ );
+
+ if (nonNull(oldRow))
+ cacheDataRowStore.removeRow(oldRow.link(), INSTANCE);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to remove key skipping
indexes: " + entry, e);
+ }
+ finally {
+ db.checkpointReadUnlock();
+ }
}
}
/**
- * Create query entity.
+ * Deleting records from the index bypassing cache.
+ *
+ * @param internalCache Cache.
+ * @param partId Partition number.
+ * @param filter Row filter.
+ * @throws Exception If failed.
*/
- private static QueryEntity personEntity() {
- QueryEntity entity = new QueryEntity();
+ static <K, V> void breakSqlIndex(
+ IgniteInternalCache<K, V> internalCache,
+ int partId,
+ @Nullable Predicate<CacheDataRow> filter
+ ) throws Exception {
+ requireNonNull(internalCache);
+
+ GridCacheContext<K, V> cacheCtx = internalCache.context();
+
+ GridDhtLocalPartition locPart =
cacheCtx.topology().localPartitions().get(partId);
+ GridIterator<CacheDataRow> cacheDataGridIter =
cacheCtx.group().offheap().partitionIterator(locPart.id());
+
+ GridQueryProcessor qryProcessor =
internalCache.context().kernalContext().query();
+
+ while (cacheDataGridIter.hasNextX()){
+ CacheDataRow cacheDataRow = cacheDataGridIter.nextX();
+
+ if (nonNull(filter) && !filter.test(cacheDataRow))
+ continue;
- entity.setKeyType(Integer.class.getName());
- entity.setValueType(Person.class.getName());
+ cacheCtx.shared().database().checkpointReadLock();
+ try {
+ qryProcessor.remove(cacheCtx, cacheDataRow);
+ }
+ finally {
+ cacheCtx.shared().database().checkpointReadUnlock();
+ }
+ }
+ }
+
+ /**
+ * Create query {@link Person} entity.
+ *
+ * @return Query {@link Person} entity.
+ */
+ static QueryEntity personEntity() {
String orgIdField = "orgId";
String nameField = "name";
- entity.addQueryField(orgIdField, Integer.class.getName(), null);
- entity.addQueryField(nameField, String.class.getName(), null);
+ return new QueryEntity()
+ .setKeyType(Integer.class.getName())
+ .setValueType(Person.class.getName())
+ .addQueryField(orgIdField, Integer.class.getName(), null)
+ .addQueryField(nameField, String.class.getName(), null)
+ .setIndexes(asList(new QueryIndex(nameField), new
QueryIndex(orgIdField)));
+ }
- entity.setIndexes(asList(new QueryIndex(nameField), new
QueryIndex(orgIdField)));
+ /**
+ * Create query {@link Person} entity.
+ *
+ * @return Query {@link Person} entity.
+ */
+ static QueryEntity organizationEntity() {
+ String idField = "id";
+ String nameField = "name";
- return entity;
+ return new QueryEntity()
+ .setKeyType(Integer.class.getName())
+ .setValueType(Organization.class.getName())
+ .addQueryField(idField, Integer.class.getName(), null)
+ .addQueryField(nameField, String.class.getName(), null)
+ .setIndexes(asList(new QueryIndex(nameField), new
QueryIndex(idField)));
}
/**
- * Simple class for tests.
+ * Simple class Person for tests.
*/
static class Person implements Serializable {
/** Id organization. */
@@ -136,15 +317,67 @@ public class GridCommandHandlerIndexingUtils {
/** Name organization. */
String name;
+ /** Address of organization. */
+ String orgAddr;
+
/**
* Constructor.
*
- * @param orgId Organization ID.
- * @param name Name.
+ * @param orgId Organization id.
+ * @param name Organization name.
*/
- public Person(int orgId, String name) {
+ Person(int orgId, String name) {
this.orgId = orgId;
this.name = name;
}
+
+ /**
+ * Set address of organization.
+ *
+ * @param orgAddr Address of organization.
+ * @return Current instance.
+ */
+ public Person orgAddr(String orgAddr) {
+ this.orgAddr = orgAddr;
+
+ return this;
+ }
+ }
+
+ /**
+ * Simple class Organization for tests.
+ */
+ static class Organization implements Serializable {
+ /** Id. */
+ int id;
+
+ /** Name. */
+ String name;
+
+ /** Address. */
+ String addr;
+
+ /**
+ * Constructor.
+ *
+ * @param id Id.
+ * @param name Name.
+ */
+ Organization(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ /**
+ * Set address.
+ *
+ * @param addr Address.
+ * @return Current instance.
+ */
+ public Organization addr(String addr) {
+ this.addr = addr;
+
+ return this;
+ }
}
}