This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d53a91f IGNITE-10507 Control.sh add ability to check crc sums of
stored pages - Fixes #5803.
d53a91f is described below
commit d53a91f0ff6efd678308726403a24b07133156a4
Author: Sergey Antonov <[email protected]>
AuthorDate: Fri Jan 18 13:54:36 2019 +0300
IGNITE-10507 Control.sh add ability to check crc sums of stored pages -
Fixes #5803.
Signed-off-by: Ivan Rakov <[email protected]>
---
.../internal/commandline/CommandHandler.java | 33 +-
.../internal/commandline/cache/CacheArguments.java | 17 +
.../processors/cache/mvcc/txlog/TxLog.java | 7 +-
.../cache/persistence/DbCheckpointListener.java | 10 +
.../GridCacheDatabaseSharedManager.java | 140 +++++++--
.../cache/persistence/GridCacheOffheapManager.java | 5 +
.../cache/persistence/file/FilePageStore.java | 7 +
.../cache/persistence/metastorage/MetaStorage.java | 7 +-
.../cache/verify/GridNotIdleException.java | 69 +++++
.../cache/verify/IdleVerifyException.java | 61 ++++
.../cache/verify/IdleVerifyResultV2.java | 124 ++++++--
.../processors/cache/verify/IdleVerifyUtility.java | 120 +++++++
.../verify/VerifyBackupPartitionsDumpTask.java | 108 ++++---
.../cache/verify/VerifyBackupPartitionsTaskV2.java | 344 +++++++++++++++------
.../visor/verify/VisorIdleVerifyDumpTaskArg.java | 62 +++-
.../internal/visor/verify/VisorIdleVerifyJob.java | 13 +-
.../visor/verify/VisorIdleVerifyTaskArg.java | 79 ++++-
.../visor/verify/VisorIdleVerifyTaskV2.java | 58 +---
.../wal/memtracker/PageMemoryTracker.java | 18 +-
.../apache/ignite/testframework/GridTestUtils.java | 11 +
.../apache/ignite/util/GridCommandHandlerTest.java | 97 +++++-
.../visor/verify/ValidateIndexesClosure.java | 105 ++++---
.../IgnitePersistentStoreSchemaLoadTest.java | 6 +-
23 files changed, 1130 insertions(+), 371 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
index bf596e7..b84e2b9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.commandline;
import java.io.Console;
import java.io.IOException;
import java.net.InetAddress;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -198,7 +199,7 @@ public class CommandHandler {
private static final String CMD_SKIP_ZEROS = "--skip-zeros";
/** Command exclude caches. */
- private static final String CMD_EXCLUDE_CACHES = "--excludeCaches";
+ private static final String CMD_EXCLUDE_CACHES = "--exclude-caches";
/** Cache filter. */
private static final String CACHE_FILTER = "--cache-filter";
@@ -368,6 +369,9 @@ public class CommandHandler {
/** */
private static final String CONFIG = "--config";
+ /** */
+ private static final String IDLE_CHECK_CRC = "--check-crc";
+
/** Utility name. */
private static final String UTILITY_NAME = "control.sh";
@@ -1041,7 +1045,10 @@ public class CommandHandler {
*/
private void legacyCacheIdleVerify(GridClient client, CacheArguments
cacheArgs) throws GridClientException {
VisorIdleVerifyTaskResult res = executeTask(
- client, VisorIdleVerifyTask.class, new
VisorIdleVerifyTaskArg(cacheArgs.caches(), cacheArgs.excludeCaches()));
+ client,
+ VisorIdleVerifyTask.class,
+ new VisorIdleVerifyTaskArg(cacheArgs.caches(),
cacheArgs.excludeCaches(), cacheArgs.idleCheckCrc())
+ );
Map<PartitionKey, List<PartitionHashRecord>> conflicts =
res.getConflicts();
@@ -1174,7 +1181,6 @@ public class CommandHandler {
* @param cacheArgs Cache args.
*/
private void cacheResetLostPartitions(GridClient client, CacheArguments
cacheArgs) throws GridClientException {
-
CacheResetLostPartitionsTaskArg taskArg = new
CacheResetLostPartitionsTaskArg(cacheArgs.caches());
CacheResetLostPartitionsTaskResult res =
executeTaskByNameOnNode(client, CacheResetLostPartitionsTask.class.getName(),
taskArg, null);
@@ -1187,13 +1193,16 @@ public class CommandHandler {
* @param cacheArgs Cache args.
*/
private void cacheIdleVerifyDump(GridClient client, CacheArguments
cacheArgs) throws GridClientException {
- String path = executeTask(
- client,
- VisorIdleVerifyDumpTask.class,
- new VisorIdleVerifyDumpTaskArg(cacheArgs.caches(),
cacheArgs.excludeCaches(), cacheArgs.isSkipZeros(), cacheArgs
- .getCacheFilterEnum())
+ VisorIdleVerifyDumpTaskArg arg = new VisorIdleVerifyDumpTaskArg(
+ cacheArgs.caches(),
+ cacheArgs.excludeCaches(),
+ cacheArgs.isSkipZeros(),
+ cacheArgs.getCacheFilterEnum(),
+ cacheArgs.idleCheckCrc()
);
+ String path = executeTask(client, VisorIdleVerifyDumpTask.class, arg);
+
log("VisorIdleVerifyDumpTask successfully written output to '" + path
+ "'");
}
@@ -1203,7 +1212,10 @@ public class CommandHandler {
*/
private void cacheIdleVerifyV2(GridClient client, CacheArguments
cacheArgs) throws GridClientException {
IdleVerifyResultV2 res = executeTask(
- client, VisorIdleVerifyTaskV2.class, new
VisorIdleVerifyTaskArg(cacheArgs.caches(), cacheArgs.excludeCaches()));
+ client,
+ VisorIdleVerifyTaskV2.class,
+ new
VisorIdleVerifyTaskArg(cacheArgs.caches(),cacheArgs.excludeCaches(),
cacheArgs.idleCheckCrc())
+ );
res.print(System.out::print);
}
@@ -2145,6 +2157,8 @@ public class CommandHandler {
}
else if (CMD_SKIP_ZEROS.equals(nextArg))
cacheArgs.skipZeros(true);
+ else if (IDLE_CHECK_CRC.equals(nextArg))
+ cacheArgs.idleCheckCrc(true);
else if (CACHE_FILTER.equals(nextArg)) {
if (cacheArgs.caches() != null ||
cacheArgs.excludeCaches() != null)
throw new
IllegalArgumentException(ONE_CACHE_FILTER_OPT_SHOULD_USED_MSG);
@@ -2756,6 +2770,7 @@ public class CommandHandler {
log("Control utility [ver. " + ACK_VER_STR + "]");
log(COPYRIGHT);
log("User: " + System.getProperty("user.name"));
+ log("Time: " + LocalDateTime.now());
log(DELIM);
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheArguments.java
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheArguments.java
index fb5bc88..1407bef 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheArguments.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheArguments.java
@@ -78,6 +78,9 @@ public class CacheArguments {
/** Cache filter. */
private CacheFilterEnum cacheFilterEnum = CacheFilterEnum.ALL;
+ /** Check CRC sum on idle verify. */
+ private boolean idleCheckCrc;
+
/**
* @return Gets filter of caches, which will by checked.
*/
@@ -307,4 +310,18 @@ public class CacheArguments {
* @param outputFormat New output format.
*/
public void outputFormat(OutputFormat outputFormat) { this.outputFormat =
outputFormat; }
+
+ /**
+ * @return Check page CRC sum on idle verify flag.
+ */
+ public boolean idleCheckCrc() {
+ return idleCheckCrc;
+ }
+
+ /**
+ * @param idleCheckCrc Check page CRC sum on idle verify flag.
+ */
+ public void idleCheckCrc(boolean idleCheckCrc) {
+ this.idleCheckCrc = idleCheckCrc;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
index ca3053c..6b4d4fb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
@@ -188,7 +188,7 @@ public class TxLog implements DbCheckpointListener {
}
/** {@inheritDoc} */
- @Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ @Override public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException {
Executor executor = ctx.executor();
if (executor == null)
@@ -205,6 +205,11 @@ public class TxLog implements DbCheckpointListener {
}
}
+ /** {@inheritDoc} */
+ @Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ /* No-op. */
+ }
+
/**
*
* @param major Major version.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
index d64bbfe..bf2f13a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
@@ -49,10 +49,20 @@ public interface DbCheckpointListener {
* @return Context executor.
*/
@Nullable public Executor executor();
+
+ /**
+ * @return {@code True} if at least one page is dirty.
+ */
+ public boolean hasPages();
}
/**
* @throws IgniteCheckedException If failed.
*/
+ public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException;
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
public void onCheckpointBegin(Context ctx) throws IgniteCheckedException;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 1ff982d..ce36bab 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -3085,6 +3085,13 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
tmpWriteBuf.order(ByteOrder.nativeOrder());
}
+ /**
+ * @return Progress of current chekpoint or {@code null}, if isn't
checkpoint at this moment.
+ */
+ public @Nullable CheckpointProgress currentProgress(){
+ return curCpProgress;
+ }
+
/** {@inheritDoc} */
@Override protected void body() {
Throwable err = null;
@@ -3604,6 +3611,8 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
checkpointLock.writeLock().lock();
+ DbCheckpointListener.Context ctx0 = null;
+
try {
tracker.onMarkStart();
@@ -3625,41 +3634,11 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
GridCompoundFuture asyncLsnrFut = asyncRunner == null ? null :
new GridCompoundFuture();
- DbCheckpointListener.Context ctx0 = new
DbCheckpointListener.Context() {
- @Override public boolean nextSnapshot() {
- return curr.nextSnapshot;
- }
-
- /** {@inheritDoc} */
- @Override public PartitionAllocationMap partitionStatMap()
{
- return map;
- }
-
- /** {@inheritDoc} */
- @Override public boolean needToSnapshot(String
cacheOrGrpName) {
- return
curr.snapshotOperation.cacheGroupIds().contains(CU.cacheId(cacheOrGrpName));
- }
-
- /** {@inheritDoc} */
- @Override public Executor executor() {
- return asyncRunner == null ? null : cmd -> {
- try {
- GridFutureAdapter<?> res = new
GridFutureAdapter<>();
-
- asyncRunner.execute(U.wrapIgniteFuture(cmd,
res));
-
- asyncLsnrFut.add(res);
- }
- catch (RejectedExecutionException e) {
- assert false : "A task should never be
rejected by async runner";
- }
- };
- }
- };
+ ctx0 = createOnCheckpointMarkBeginContext(curr, map,
asyncLsnrFut);
// Listeners must be invoked before we write checkpoint record
to WAL.
for (DbCheckpointListener lsnr : lsnrs)
- lsnr.onCheckpointBegin(ctx0);
+ lsnr.onMarkCheckpointBegin(ctx0);
if (asyncLsnrFut != null) {
asyncLsnrFut.markInitialized();
@@ -3751,8 +3730,13 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
tracker.onLockRelease();
}
+ DbCheckpointListener.Context ctx =
createOnCheckpointBeginContext(ctx0, hasPages);
+
curr.cpBeginFut.onDone();
+ for (DbCheckpointListener lsnr : lsnrs)
+ lsnr.onCheckpointBegin(ctx);
+
if (snapFut != null) {
try {
snapFut.get();
@@ -3812,6 +3796,86 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
}
}
+ /** */
+ private DbCheckpointListener.Context createOnCheckpointBeginContext(
+ DbCheckpointListener.Context delegate,
+ boolean hasPages
+ ) {
+ return new DbCheckpointListener.Context() {
+ /** {@inheritDoc} */
+ @Override public boolean nextSnapshot() {
+ return delegate.nextSnapshot();
+ }
+
+ /** {@inheritDoc} */
+ @Override public PartitionAllocationMap partitionStatMap() {
+ return delegate.partitionStatMap();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needToSnapshot(String cacheOrGrpName)
{
+ return delegate.needToSnapshot(cacheOrGrpName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Executor executor() {
+ return delegate.executor();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasPages() {
+ return hasPages;
+ }
+ };
+ }
+
+ /** */
+ private DbCheckpointListener.Context
createOnCheckpointMarkBeginContext(
+ CheckpointProgress currCpProgress,
+ PartitionAllocationMap map,
+ GridCompoundFuture asyncLsnrFut
+ ) {
+ return new DbCheckpointListener.Context() {
+ /** {@inheritDoc} */
+ @Override public boolean nextSnapshot() {
+ return currCpProgress.nextSnapshot;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PartitionAllocationMap partitionStatMap() {
+ return map;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needToSnapshot(String cacheOrGrpName) {
+ return
currCpProgress.snapshotOperation.cacheGroupIds().contains(CU.cacheId(cacheOrGrpName));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Executor executor() {
+ return asyncRunner == null ? null : cmd -> {
+ try {
+ GridFutureAdapter<?> res = new
GridFutureAdapter<>();
+
+ asyncRunner.execute(U.wrapIgniteFuture(cmd, res));
+
+ asyncLsnrFut.add(res);
+ }
+ catch (RejectedExecutionException e) {
+ throw new IgniteException("A task should never be
rejected by async runner", e);
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasPages() {
+ throw new IllegalStateException(
+ "Property is unknown at this moment. You should use
onCheckpointBegin() method."
+ );
+ }
+ };
+ }
+
/**
* Check that at least one collection is not empty.
*
@@ -4257,7 +4321,7 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
/**
* Data class representing the state of running/scheduled checkpoint.
*/
- private static class CheckpointProgress {
+ public static class CheckpointProgress {
/** Scheduled time of checkpoint. */
private volatile long nextCpTs;
@@ -4295,6 +4359,16 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
private CheckpointProgress(long nextCpTs) {
this.nextCpTs = nextCpTs;
}
+
+ /** */
+ public boolean started() {
+ return cpBeginFut.isDone();
+ }
+
+ /** */
+ public boolean finished() {
+ return cpFinishFut.isDone();
+ }
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index f78428d..9d11e76 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -173,6 +173,11 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException {
assert grp.dataRegion().pageMemory() instanceof PageMemoryEx;
Executor execSvc = ctx.executor();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index a8fae08..8e0196e 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -714,6 +714,13 @@ public class FilePageStore implements PageStore {
}
/**
+ * @return File absolute path.
+ */
+ public String getFileAbsolutePath() {
+ return cfgFile.getAbsolutePath();
+ }
+
+ /**
*
*/
private long allocPage() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index 0906605..dff2a03 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -571,7 +571,7 @@ public class MetaStorage implements DbCheckpointListener,
ReadWriteMetastorage {
}
/** {@inheritDoc} */
- @Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ @Override public void onMarkCheckpointBegin(Context ctx) throws
IgniteCheckedException {
Executor executor = ctx.executor();
if (executor == null) {
@@ -600,6 +600,11 @@ public class MetaStorage implements DbCheckpointListener,
ReadWriteMetastorage {
}
}
+ /** {@inheritDoc} */
+ @Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ /* No-op. */
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/GridNotIdleException.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/GridNotIdleException.java
new file mode 100644
index 0000000..8ea38f6
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/GridNotIdleException.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.verify;
+
+import org.apache.ignite.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This exception defines not idle cluster state, when idle state expected.
+ */
+public class GridNotIdleException extends IgniteException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Create empty exception.
+ */
+ public GridNotIdleException() {
+ // No-op.
+ }
+
+ /**
+ * Creates new exception with given error message.
+ *
+ * @param msg Error message.
+ */
+ public GridNotIdleException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates new exception with given throwable as a cause and source of
error message.
+ *
+ * @param cause Non-null throwable cause.
+ */
+ public GridNotIdleException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates new exception with given error message and optional nested
exception.
+ *
+ * @param msg Error message.
+ * @param cause Optional nested exception (can be {@code null}).
+ */
+ public GridNotIdleException(String msg, @Nullable Throwable cause) {
+ super(msg, cause);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return getClass() + ": " + getMessage();
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyException.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyException.java
new file mode 100644
index 0000000..a6d8a3f
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.verify;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * This exception is used to collect exceptions occured in {@link
VerifyBackupPartitionsTaskV2} execution.
+ */
+public class IdleVerifyException extends IgniteException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Occured exceptions. */
+ private final Collection<IgniteException> exceptions;
+
+ /** */
+ public IdleVerifyException(Collection<IgniteException> exceptions) {
+ if(F.isEmpty(exceptions))
+ throw new IllegalArgumentException("Exceptions can't be empty!");
+
+ this.exceptions = exceptions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getMessage() {
+ return exceptions.stream()
+ .map(Throwable::getMessage)
+ .collect(Collectors.joining(", "));
+ }
+
+ /**
+ * @return Exceptions.
+ */
+ public Collection<IgniteException> exceptions() {
+ return exceptions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return getClass() + ": " + getMessage();
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
index a153063..3ccfe00 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
@@ -16,23 +16,37 @@
*/
package org.apache.ignite.internal.processors.cache.verify;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.PrintWriter;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.function.Consumer;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorDataTransferObject;
+import org.jetbrains.annotations.Nullable;
+
+import static
org.apache.ignite.internal.commandline.cache.CacheCommand.IDLE_VERIFY;
/**
* Encapsulates result of {@link VerifyBackupPartitionsTaskV2}.
*/
public class IdleVerifyResultV2 extends VisorDataTransferObject {
/** */
+ public static final String IDLE_VERIFY_FILE_PREFIX = IDLE_VERIFY + "-";
+
+ /** Time formatter for log file name. */
+ private static final DateTimeFormatter TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH-mm-ss_SSS");
+
+ /** */
private static final long serialVersionUID = 0L;
/** Counter conflicts. */
@@ -45,18 +59,19 @@ public class IdleVerifyResultV2 extends
VisorDataTransferObject {
private Map<PartitionKeyV2, List<PartitionHashRecordV2>> movingPartitions;
/** Exceptions. */
- private Map<UUID, Exception> exceptions;
+ private Map<ClusterNode, Exception> exceptions;
/**
* @param cntrConflicts Counter conflicts.
* @param hashConflicts Hash conflicts.
* @param movingPartitions Moving partitions.
+ * @param exceptions Occured exceptions.
*/
public IdleVerifyResultV2(
Map<PartitionKeyV2, List<PartitionHashRecordV2>> cntrConflicts,
Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashConflicts,
Map<PartitionKeyV2, List<PartitionHashRecordV2>> movingPartitions,
- Map<UUID, Exception> exceptions
+ Map<ClusterNode, Exception> exceptions
) {
this.cntrConflicts = cntrConflicts;
this.hashConflicts = hashConflicts;
@@ -125,51 +140,54 @@ public class IdleVerifyResultV2 extends
VisorDataTransferObject {
/**
* @return Exceptions on nodes.
*/
- public Map<UUID, Exception> exceptions() {
+ public Map<ClusterNode, Exception> exceptions() {
return exceptions;
}
/**
- * Print formatted result to given printer.
+ * Print formatted result to given printer. If exceptions presented
exception messages will be written to log file.
*
* @param printer Consumer for handle formatted result.
+ * @return Path to log file if exceptions presented and {@code null}
otherwise.
*/
- public void print(Consumer<String> printer) {
- if (!hasConflicts())
- printer.accept("idle_verify check has finished, no conflicts have
been found.\n");
- else {
- int cntrConflictsSize = counterConflicts().size();
- int hashConflictsSize = hashConflicts().size();
+ public @Nullable String print(Consumer<String> printer) {
+ print(printer, false);
- printer.accept("idle_verify check has finished, found " +
(cntrConflictsSize + hashConflictsSize) +
- " conflict partitions: [counterConflicts=" + cntrConflictsSize
+ ", hashConflicts=" +
- hashConflictsSize + "]\n");
+ if (!F.isEmpty(exceptions)) {
+ File f = new File(IDLE_VERIFY_FILE_PREFIX +
LocalDateTime.now().format(TIME_FORMATTER) + ".txt");
- if (!F.isEmpty(counterConflicts())) {
- printer.accept("Update counter conflicts:\n");
+ try (PrintWriter pw = new PrintWriter(f)) {
+ print(pw::write, true);
- for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>>
entry : counterConflicts().entrySet()) {
- printer.accept("Conflict partition: " + entry.getKey() +
"\n");
+ pw.flush();
- printer.accept("Partition instances: " + entry.getValue()
+ "\n");
- }
+ printer.accept("See log for additional information. " +
f.getAbsolutePath() + "\n");
- printer.accept("\n");
+ return f.getAbsolutePath();
}
+ catch (FileNotFoundException e) {
+ printer.accept("Can't write exceptions to file " +
f.getAbsolutePath() + " " + e.getMessage() + "\n");
- if (!F.isEmpty(hashConflicts())) {
- printer.accept("Hash conflicts:\n");
+ e.printStackTrace();
+ }
+ }
- for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>>
entry : hashConflicts().entrySet()) {
- printer.accept("Conflict partition: " + entry.getKey() +
"\n");
+ return null;
+ }
- printer.accept("Partition instances: " + entry.getValue()
+ "\n");
- }
+ /** */
+ private void print(Consumer<String> printer, boolean
printExceptionMessages) {
+ if (!F.isEmpty(exceptions)) {
+ int size = exceptions.size();
- printer.accept("\n");
- }
+ printer.accept("idle_verify failed on " + size + " node" + (size
== 1 ? "" : "s") + ".\n");
}
+ if (!hasConflicts())
+ printer.accept("idle_verify check has finished, no conflicts have
been found.\n");
+ else
+ printConflicts(printer);
+
if (!F.isEmpty(movingPartitions())) {
printer.accept("Verification was skipped for " +
movingPartitions().size() + " MOVING partitions:\n");
@@ -185,11 +203,51 @@ public class IdleVerifyResultV2 extends
VisorDataTransferObject {
if (!F.isEmpty(exceptions())) {
printer.accept("Idle verify failed on nodes:\n");
- for (Map.Entry<UUID, Exception> e : exceptions().entrySet()) {
- printer.accept("Node ID: " + e.getKey() + "\n");
- printer.accept("Exception message:" + "\n");
- printer.accept(e.getValue().getMessage() + "\n");
+ for (Map.Entry<ClusterNode, Exception> e :
exceptions().entrySet()) {
+ ClusterNode n = e.getKey();
+
+ printer.accept("Node ID: " + n.id() + " " + n.addresses() + "
consistent ID: " + n.consistentId() + "\n");
+
+ if (printExceptionMessages) {
+ printer.accept("Exception message:" + "\n");
+
+ printer.accept(e.getValue().getMessage() + "\n");
+ }
+ }
+ }
+ }
+
+ /** */
+ private void printConflicts(Consumer<String> printer) {
+ int cntrConflictsSize = counterConflicts().size();
+ int hashConflictsSize = hashConflicts().size();
+
+ printer.accept("idle_verify check has finished, found " +
(cntrConflictsSize + hashConflictsSize) +
+ " conflict partitions: [counterConflicts=" + cntrConflictsSize +
", hashConflicts=" +
+ hashConflictsSize + "]\n");
+
+ if (!F.isEmpty(counterConflicts())) {
+ printer.accept("Update counter conflicts:\n");
+
+ for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry
: counterConflicts().entrySet()) {
+ printer.accept("Conflict partition: " + entry.getKey() + "\n");
+
+ printer.accept("Partition instances: " + entry.getValue() +
"\n");
+ }
+
+ printer.accept("\n");
+ }
+
+ if (!F.isEmpty(hashConflicts())) {
+ printer.accept("Hash conflicts:\n");
+
+ for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry
: hashConflicts().entrySet()) {
+ printer.accept("Conflict partition: " + entry.getKey() + "\n");
+
+ printer.accept("Partition instances: " + entry.getValue() +
"\n");
}
+
+ printer.accept("\n");
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
new file mode 100644
index 0000000..d386ec3
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.verify;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utility class for idle verify command.
+ */
+public class IdleVerifyUtility {
+ /** Cluster not idle message. */
+ public static final String CLUSTER_NOT_IDLE_MSG = "Checkpoint with dirty
pages started! Cluster not idle!";
+
+ /**
+ * See {@link IdleVerifyUtility#checkPartitionsPageCrcSum(FilePageStore,
CacheGroupContext, int, byte,
+ * AtomicBoolean)}.
+ */
+ public static void checkPartitionsPageCrcSum(
+ @Nullable FilePageStoreManager pageStoreMgr,
+ CacheGroupContext grpCtx,
+ int partId,
+ byte pageType,
+ AtomicBoolean cpFlag
+ ) throws IgniteCheckedException, GridNotIdleException {
+ if (!grpCtx.persistenceEnabled() || pageStoreMgr == null)
+ return;
+
+ FilePageStore pageStore =
(FilePageStore)pageStoreMgr.getStore(grpCtx.groupId(), partId);
+
+ checkPartitionsPageCrcSum(pageStore, grpCtx, partId, pageType, cpFlag);
+ }
+
+ /**
+ * Checks CRC sum of pages with {@code pageType} page type stored in
partiion with {@code partId} id and assosiated
+ * with cache group. <br/> Method could be invoked only on idle cluster!
+ *
+ * @param pageStore Page store.
+ * @param grpCtx Passed cache group context.
+ * @param partId Partition id.
+ * @param pageType Page type. Possible types {@link
PageIdAllocator#FLAG_DATA}, {@link PageIdAllocator#FLAG_IDX}.
+ * @param cpFlag Checkpoint flag for detecting start checkpoint with dirty
pages.
+ * @throws IgniteCheckedException If reading page failed.
+ * @throws GridNotIdleException If cluster not idle.
+ */
+ public static void checkPartitionsPageCrcSum(
+ FilePageStore pageStore,
+ CacheGroupContext grpCtx,
+ int partId,
+ byte pageType,
+ AtomicBoolean cpFlag
+ ) throws IgniteCheckedException, GridNotIdleException {
+ assert pageType == PageIdAllocator.FLAG_DATA || pageType ==
PageIdAllocator.FLAG_IDX : pageType;
+
+ long pageId = PageIdUtils.pageId(partId, pageType, 0);
+
+ ByteBuffer buf =
ByteBuffer.allocateDirect(grpCtx.dataRegion().pageMemory().pageSize());
+
+ buf.order(ByteOrder.nativeOrder());
+
+ for (int pageNo = 0; pageNo < pageStore.pages(); pageId++, pageNo++) {
+ buf.clear();
+
+ if (cpFlag.get())
+ throw new GridNotIdleException(CLUSTER_NOT_IDLE_MSG);
+
+ pageStore.read(pageId, buf, true);
+ }
+
+ if (cpFlag.get())
+ throw new GridNotIdleException(CLUSTER_NOT_IDLE_MSG);
+ }
+
+ /**
+ * @param db Shared DB manager.
+ * @return {@code True} if checkpoint is now, {@code False} otherwise.
+ */
+ public static boolean isCheckpointNow(@Nullable
IgniteCacheDatabaseSharedManager db) {
+ if (!(db instanceof GridCacheDatabaseSharedManager))
+ return false;
+
+ GridCacheDatabaseSharedManager.CheckpointProgress progress =
+
((GridCacheDatabaseSharedManager)db).getCheckpointer().currentProgress();
+
+ if (progress == null)
+ return false;
+
+ return progress.started() && !progress.finished();
+ }
+
+ /** */
+ private IdleVerifyUtility() {
+ /* No-op. */
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsDumpTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsDumpTask.java
index c0fd36a..2129d70 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsDumpTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsDumpTask.java
@@ -16,10 +16,10 @@
*/
package org.apache.ignite.internal.processors.cache.verify;
-import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.PrintWriter;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -42,7 +43,6 @@ import
org.apache.ignite.internal.visor.verify.VisorIdleVerifyDumpTaskArg;
import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -77,8 +77,10 @@ public class VerifyBackupPartitionsDumpTask extends
ComputeTaskAdapter<VisorIdle
private IgniteLogger log;
/** {@inheritDoc} */
- @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(
- List<ClusterNode> subgrid, VisorIdleVerifyTaskArg arg) throws
IgniteException {
+ @Override public @Nullable Map<? extends ComputeJob, ClusterNode> map(
+ List<ClusterNode> subgrid,
+ VisorIdleVerifyTaskArg arg
+ ) throws IgniteException {
if (arg instanceof VisorIdleVerifyDumpTaskArg)
taskArg = (VisorIdleVerifyDumpTaskArg)arg;
@@ -86,8 +88,7 @@ public class VerifyBackupPartitionsDumpTask extends
ComputeTaskAdapter<VisorIdle
}
/** {@inheritDoc} */
- @Nullable @Override public String reduce(List<ComputeJobResult> results)
- throws IgniteException {
+ @Override public @Nullable String reduce(List<ComputeJobResult> results)
throws IgniteException {
Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new
TreeMap<>(buildPartitionKeyComparator());
for (ComputeJobResult res : results) {
@@ -123,19 +124,11 @@ public class VerifyBackupPartitionsDumpTask extends
ComputeTaskAdapter<VisorIdle
}
/** {@inheritDoc} */
- @Override public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) throws
- IgniteException {
- ComputeJobResultPolicy superRes = super.result(res, rcvd);
-
- // Deny failover.
- if (superRes == ComputeJobResultPolicy.FAILOVER) {
- superRes = ComputeJobResultPolicy.WAIT;
-
- log.warning("VerifyBackupPartitionsJobV2 failed on node " +
- "[consistentId=" + res.getNode().consistentId() + "]",
res.getException());
- }
-
- return superRes;
+ @Override public ComputeJobResultPolicy result(
+ ComputeJobResult res,
+ List<ComputeJobResult> rcvd
+ ) throws IgniteException {
+ return delegate.result(res, rcvd);
}
/**
@@ -177,46 +170,18 @@ public class VerifyBackupPartitionsDumpTask extends
ComputeTaskAdapter<VisorIdle
IdleVerifyResultV2 conflictRes,
int skippedRecords
) throws IgniteException {
- File workDir = ignite.configuration().getWorkDirectory() == null
- ? new File("/tmp")
- : new File(ignite.configuration().getWorkDirectory());
+ String wd = ignite.configuration().getWorkDirectory();
+
+ File workDir = wd == null ? new File("/tmp") : new File(wd);
File out = new File(workDir, IDLE_DUMP_FILE_PREFIX +
LocalDateTime.now().format(TIME_FORMATTER) + ".txt");
ignite.log().info("IdleVerifyDumpTask will write output to " +
out.getAbsolutePath());
- try (BufferedWriter writer = new BufferedWriter(new FileWriter(out))) {
- try {
-
- writer.write("idle_verify check has finished, found " +
partitions.size() + " partitions\n");
+ try (PrintWriter writer = new PrintWriter(new FileWriter(out))) {
+ writeResult(partitions, conflictRes, skippedRecords, writer);
- if (skippedRecords > 0)
- writer.write(skippedRecords + " partitions was skipped\n");
-
- if (!F.isEmpty(partitions)) {
- writer.write("Cluster partitions:\n");
-
- for (Map.Entry<PartitionKeyV2,
List<PartitionHashRecordV2>> entry : partitions.entrySet()) {
- writer.write("Partition: " + entry.getKey() + "\n");
-
- writer.write("Partition instances: " +
entry.getValue() + "\n");
- }
-
-
writer.write("\n\n-----------------------------------\n\n");
-
- conflictRes.print(str -> {
- try {
- writer.write(str);
- }
- catch (IOException e) {
- throw new IgniteException("Failed to write
partitions conflict.", e);
- }
- });
- }
- }
- finally {
- writer.flush();
- }
+ writer.flush();
ignite.log().info("IdleVerifyDumpTask successfully written dump to
'" + out.getAbsolutePath() + "'");
}
@@ -229,10 +194,43 @@ public class VerifyBackupPartitionsDumpTask extends
ComputeTaskAdapter<VisorIdle
return out.getAbsolutePath();
}
+ /** */
+ private void writeResult(
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> partitions,
+ IdleVerifyResultV2 conflictRes,
+ int skippedRecords,
+ PrintWriter writer
+ ) {
+ if (!F.isEmpty(conflictRes.exceptions())) {
+ int size = conflictRes.exceptions().size();
+
+ writer.write("idle_verify failed on " + size + " node" + (size ==
1 ? "" : "s") + ".\n");
+ }
+
+ writer.write("idle_verify check has finished, found " +
partitions.size() + " partitions\n");
+
+ if (skippedRecords > 0)
+ writer.write(skippedRecords + " partitions was skipped\n");
+
+ if (!F.isEmpty(partitions)) {
+ writer.write("Cluster partitions:\n");
+
+ for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry
: partitions.entrySet()) {
+ writer.write("Partition: " + entry.getKey() + "\n");
+
+ writer.write("Partition instances: " + entry.getValue() +
"\n");
+ }
+
+ writer.write("\n\n-----------------------------------\n\n");
+
+ conflictRes.print(writer::write);
+ }
+ }
+
/**
* @return Comparator for {@link PartitionHashRecordV2}.
*/
- @NotNull private Comparator<PartitionHashRecordV2> buildRecordComparator()
{
+ private Comparator<PartitionHashRecordV2> buildRecordComparator() {
return (o1, o2) -> {
int compare = Boolean.compare(o1.isPrimary(), o2.isPrimary());
@@ -246,7 +244,7 @@ public class VerifyBackupPartitionsDumpTask extends
ComputeTaskAdapter<VisorIdle
/**
* @return Comparator for {@link PartitionKeyV2}.
*/
- @NotNull private Comparator<PartitionKeyV2> buildPartitionKeyComparator() {
+ private Comparator<PartitionKeyV2> buildPartitionKeyComparator() {
return (o1, o2) -> {
int compare = Integer.compare(o1.groupId(), o2.groupId());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
index c30d37b..c5bc8d3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
@@ -25,19 +25,17 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
@@ -54,8 +52,14 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.verify.CacheFilterEnum;
import org.apache.ignite.internal.visor.verify.VisorIdleVerifyDumpTaskArg;
@@ -65,6 +69,9 @@ import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
+
/**
* Task for comparing update counters and checksums between primary and backup
partitions of specified caches.
* <br>
@@ -89,7 +96,9 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
/** {@inheritDoc} */
@Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(
- List<ClusterNode> subgrid, VisorIdleVerifyTaskArg arg) throws
IgniteException {
+ List<ClusterNode> subgrid,
+ VisorIdleVerifyTaskArg arg
+ ) throws IgniteException {
Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
for (ClusterNode node : subgrid)
@@ -99,27 +108,46 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
}
/** {@inheritDoc} */
- @Nullable @Override public IdleVerifyResultV2
reduce(List<ComputeJobResult> results)
- throws IgniteException {
+ @Nullable @Override public IdleVerifyResultV2
reduce(List<ComputeJobResult> results) throws IgniteException {
Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new
HashMap<>();
- Map<UUID, Exception> exceptions = new HashMap<>();
-
- for (ComputeJobResult res : results) {
- if (res.getException() != null) {
- exceptions.put(res.getNode().id(), res.getException());
- continue;
- }
+ Map<ClusterNode, Exception> exceptions = new HashMap<>();
- Map<PartitionKeyV2, PartitionHashRecordV2> nodeHashes =
res.getData();
+ reduceResults(results, clusterHashes, exceptions);
- for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> e :
nodeHashes.entrySet()) {
- List<PartitionHashRecordV2> records =
clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>());
+ return checkConflicts(clusterHashes, exceptions);
+ }
- records.add(e.getValue());
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(
+ ComputeJobResult res,
+ List<ComputeJobResult> rcvd
+ ) throws IgniteException {
+ try {
+ ComputeJobResultPolicy superRes = super.result(res, rcvd);
+
+ // Deny failover.
+ if (superRes == ComputeJobResultPolicy.FAILOVER) {
+ superRes = ComputeJobResultPolicy.WAIT;
+
+ if (log != null) {
+ log.warning("VerifyBackupPartitionsJobV2 failed on node " +
+ "[consistentId=" + res.getNode().consistentId() + "]",
res.getException());
+ }
}
+
+ return superRes;
}
+ catch (IgniteException e) {
+ return ComputeJobResultPolicy.WAIT;
+ }
+ }
+ /** */
+ private IdleVerifyResultV2 checkConflicts(
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes,
+ Map<ClusterNode, Exception> exceptions
+ ) {
Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashConflicts = new
HashMap<>();
Map<PartitionKeyV2, List<PartitionHashRecordV2>> updateCntrConflicts =
new HashMap<>();
@@ -158,20 +186,27 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
return new IdleVerifyResultV2(updateCntrConflicts, hashConflicts,
movingParts, exceptions);
}
- /** {@inheritDoc} */
- @Override public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) throws
- IgniteException {
- ComputeJobResultPolicy superRes = super.result(res, rcvd);
+ /** */
+ private void reduceResults(
+ List<ComputeJobResult> results,
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes,
+ Map<ClusterNode, Exception> exceptions
+ ) {
+ for (ComputeJobResult res : results) {
+ if (res.getException() != null) {
+ exceptions.put(res.getNode(), res.getException());
- // Deny failover.
- if (superRes == ComputeJobResultPolicy.FAILOVER) {
- superRes = ComputeJobResultPolicy.WAIT;
+ continue;
+ }
- log.warning("VerifyBackupPartitionsJobV2 failed on node " +
- "[consistentId=" + res.getNode().consistentId() + "]",
res.getException());
- }
+ Map<PartitionKeyV2, PartitionHashRecordV2> nodeHashes =
res.getData();
+
+ for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> e :
nodeHashes.entrySet()) {
+ List<PartitionHashRecordV2> records =
clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>());
- return superRes;
+ records.add(e.getValue());
+ }
+ }
}
/**
@@ -204,39 +239,102 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
/** {@inheritDoc} */
@Override public Map<PartitionKeyV2, PartitionHashRecordV2> execute()
throws IgniteException {
- Set<Integer> grpIds = new HashSet<>();
+ Set<Integer> grpIds = getGroupIds();
- if (arg.getCaches() != null && !arg.getCaches().isEmpty()) {
- Set<String> missingCaches = new HashSet<>();
+ completionCntr.set(0);
- for (String cacheName : arg.getCaches()) {
- DynamicCacheDescriptor desc =
ignite.context().cache().cacheDescriptor(cacheName);
+ AtomicBoolean cpFlag = new AtomicBoolean();
- if (desc == null || !isCacheMatchFilter(cacheName)) {
- missingCaches.add(cacheName);
+ GridCacheDatabaseSharedManager db = null;
- continue;
+ DbCheckpointListener lsnr = null;
+
+ if (arg.isCheckCrc() &&
+ ignite.context().cache().context().database() instanceof
GridCacheDatabaseSharedManager) {
+ db =
(GridCacheDatabaseSharedManager)ignite.context().cache().context().database();
+
+ lsnr = new DbCheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ /* No-op. */
}
- grpIds.add(desc.groupId());
- }
+ @Override public void onCheckpointBegin(Context ctx) {
+ if (ctx.hasPages())
+ cpFlag.set(true);
+ }
+ };
- handlingMissedCaches(missingCaches);
+ db.addCheckpointListener(lsnr);
}
- else if (onlySpecificCaches()) {
- for (DynamicCacheDescriptor desc :
ignite.context().cache().cacheDescriptors().values()) {
- if (desc.cacheConfiguration().getCacheMode() !=
CacheMode.LOCAL
- && isCacheMatchFilter(desc.cacheName()))
- grpIds.add(desc.groupId());
+
+ try {
+ if (arg.isCheckCrc() && IdleVerifyUtility.isCheckpointNow(db))
+ throw new
GridNotIdleException(IdleVerifyUtility.CLUSTER_NOT_IDLE_MSG);
+
+ List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>>
partHashCalcFuts =
+ calcPartitionHashAsync(grpIds, cpFlag);
+
+ Map<PartitionKeyV2, PartitionHashRecordV2> res = new
HashMap<>();
+
+ List<IgniteException> exceptions = new ArrayList<>();
+
+ long lastProgressLogTs = U.currentTimeMillis();
+
+ for (int i = 0; i < partHashCalcFuts.size(); ) {
+ Future<Map<PartitionKeyV2, PartitionHashRecordV2>> fut =
partHashCalcFuts.get(i);
+
+ try {
+ Map<PartitionKeyV2, PartitionHashRecordV2> partHash =
fut.get(100, TimeUnit.MILLISECONDS);
+
+ res.putAll(partHash);
+
+ i++;
+ }
+ catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() instanceof IgniteException &&
!(e.getCause() instanceof GridNotIdleException)) {
+ exceptions.add((IgniteException)e.getCause());
+
+ i++;
+
+ continue;
+ }
+
+ for (int j = i + 1; j < partHashCalcFuts.size(); j++)
+ partHashCalcFuts.get(j).cancel(false);
+
+ if (e instanceof InterruptedException)
+ throw new
IgniteInterruptedException((InterruptedException)e);
+ else
+ throw new IgniteException(e.getCause());
+ }
+ catch (TimeoutException ignored) {
+ if (U.currentTimeMillis() - lastProgressLogTs > 3 * 60
* 1000L) {
+ lastProgressLogTs = U.currentTimeMillis();
+
+ log.warning("idle_verify is still running,
processed " + completionCntr.get() + " of " +
+ partHashCalcFuts.size() + " local partitions");
+ }
+ }
}
+
+ if (!F.isEmpty(exceptions))
+ throw new IdleVerifyException(exceptions);
+
+ return res;
}
- else
- grpIds = getCacheGroupIds();
+ finally {
+ if (db != null && lsnr != null)
+ db.removeCheckpointListener(lsnr);
+ }
+ }
+ /** */
+ private List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>>
calcPartitionHashAsync(
+ Set<Integer> grpIds,
+ AtomicBoolean cpFlag
+ ) {
List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>>
partHashCalcFutures = new ArrayList<>();
- completionCntr.set(0);
-
for (Integer grpId : grpIds) {
CacheGroupContext grpCtx =
ignite.context().cache().cacheGroup(grpId);
@@ -246,45 +344,43 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
List<GridDhtLocalPartition> parts =
grpCtx.topology().localPartitions();
for (GridDhtLocalPartition part : parts)
-
partHashCalcFutures.add(calculatePartitionHashAsync(grpCtx, part));
+
partHashCalcFutures.add(calculatePartitionHashAsync(grpCtx, part, cpFlag));
}
- Map<PartitionKeyV2, PartitionHashRecordV2> res = new HashMap<>();
+ return partHashCalcFutures;
+ }
+
+ /** */
+ private Set<Integer> getGroupIds() {
+ Set<Integer> grpIds = new HashSet<>();
- long lastProgressLogTs = U.currentTimeMillis();
+ if (arg.getCaches() != null && !arg.getCaches().isEmpty()) {
+ Set<String> missingCaches = new HashSet<>();
- for (int i = 0; i < partHashCalcFutures.size(); ) {
- Future<Map<PartitionKeyV2, PartitionHashRecordV2>> fut =
partHashCalcFutures.get(i);
+ for (String cacheName : arg.getCaches()) {
+ DynamicCacheDescriptor desc =
ignite.context().cache().cacheDescriptor(cacheName);
- try {
- Map<PartitionKeyV2, PartitionHashRecordV2> partHash =
fut.get(100, TimeUnit.MILLISECONDS);
+ if (desc == null || !isCacheMatchFilter(cacheName)) {
+ missingCaches.add(cacheName);
- res.putAll(partHash);
+ continue;
+ }
- i++;
- }
- catch (InterruptedException | ExecutionException e) {
- for (int j = i + 1; j < partHashCalcFutures.size(); j++)
- partHashCalcFutures.get(j).cancel(false);
-
- if (e instanceof InterruptedException)
- throw new
IgniteInterruptedException((InterruptedException)e);
- else if (e.getCause() instanceof IgniteException)
- throw (IgniteException)e.getCause();
- else
- throw new IgniteException(e.getCause());
+ grpIds.add(desc.groupId());
}
- catch (TimeoutException ignored) {
- if (U.currentTimeMillis() - lastProgressLogTs > 3 * 60 *
1000L) {
- lastProgressLogTs = U.currentTimeMillis();
- log.warning("idle_verify is still running, processed "
+ completionCntr.get() + " of " +
- partHashCalcFutures.size() + " local partitions");
- }
+ handlingMissedCaches(missingCaches);
+ }
+ else if (onlySpecificCaches()) {
+ for (DynamicCacheDescriptor desc :
ignite.context().cache().cacheDescriptors().values()) {
+ if (desc.cacheConfiguration().getCacheMode() != LOCAL &&
isCacheMatchFilter(desc.cacheName()))
+ grpIds.add(desc.groupId());
}
}
+ else
+ grpIds = getCacheGroupIds();
- return res;
+ return grpIds;
}
/**
@@ -295,11 +391,12 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
Set<Integer> grpIds = new HashSet<>();
- if (arg.excludeCaches() == null || arg.excludeCaches().isEmpty()) {
+ if (F.isEmpty(arg.getExcludeCaches())) {
for (CacheGroupContext grp : groups) {
if (!grp.systemCache() && !grp.isLocal())
grpIds.add(grp.groupId());
}
+
return grpIds;
}
@@ -315,11 +412,11 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
* @param grp Group.
*/
private boolean isGrpExcluded(CacheGroupContext grp) {
- if (arg.excludeCaches().contains(grp.name()))
+ if (arg.getExcludeCaches().contains(grp.name()))
return true;
for (GridCacheContext cacheCtx : grp.caches()) {
- if (arg.excludeCaches().contains(cacheCtx.name()))
+ if (arg.getExcludeCaches().contains(cacheCtx.name()))
return true;
}
@@ -327,7 +424,7 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
}
/**
- * Checks and throw exception if caches was missed.
+ * Checks and throw exception if caches was missed.
*
* @param missingCaches Missing caches.
*/
@@ -335,22 +432,20 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
if (missingCaches.isEmpty())
return;
- StringBuilder strBuilder = new StringBuilder("The following caches
do not exist");
+ SB strBuilder = new SB("The following caches do not exist");
if (onlySpecificCaches()) {
VisorIdleVerifyDumpTaskArg vdta =
(VisorIdleVerifyDumpTaskArg)arg;
- strBuilder.append(" or do not match to the given filter [")
- .append(vdta.getCacheFilterEnum())
- .append("]: ");
+ strBuilder.a(" or do not match to the given filter
[").a(vdta.getCacheFilterEnum()).a("]: ");
}
else
- strBuilder.append(": ");
+ strBuilder.a(": ");
for (String name : missingCaches)
- strBuilder.append(name).append(", ");
+ strBuilder.a(name).a(", ");
- strBuilder.delete(strBuilder.length() - 2, strBuilder.length());
+ strBuilder.d(strBuilder.length() - 2, strBuilder.length());
throw new IgniteException(strBuilder.toString());
}
@@ -373,9 +468,12 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
*/
private boolean isCacheMatchFilter(String cacheName) {
if (arg instanceof VisorIdleVerifyDumpTaskArg) {
- DataStorageConfiguration dsc =
ignite.context().config().getDataStorageConfiguration();
+ DataStorageConfiguration dsCfg =
ignite.context().config().getDataStorageConfiguration();
+
DynamicCacheDescriptor desc =
ignite.context().cache().cacheDescriptor(cacheName);
+
CacheConfiguration cc = desc.cacheConfiguration();
+
VisorIdleVerifyDumpTaskArg vdta =
(VisorIdleVerifyDumpTaskArg)arg;
switch (vdta.getCacheFilterEnum()) {
@@ -383,16 +481,16 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
return !desc.cacheType().userCache();
case NOT_PERSISTENT:
- return desc.cacheType().userCache() &&
!GridCacheUtils.isPersistentCache(cc, dsc);
+ return desc.cacheType().userCache() &&
!GridCacheUtils.isPersistentCache(cc, dsCfg);
case PERSISTENT:
- return desc.cacheType().userCache() &&
GridCacheUtils.isPersistentCache(cc, dsc);
+ return desc.cacheType().userCache() &&
GridCacheUtils.isPersistentCache(cc, dsCfg);
case ALL:
break;
default:
- assert false: "Illegal cache filter: " +
vdta.getCacheFilterEnum();
+ assert false : "Illegal cache filter: " +
vdta.getCacheFilterEnum();
}
}
@@ -402,25 +500,25 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
/**
* @param grpCtx Group context.
* @param part Local partition.
+ * @param cpFlag Checkpoint flag.
*/
private Future<Map<PartitionKeyV2, PartitionHashRecordV2>>
calculatePartitionHashAsync(
final CacheGroupContext grpCtx,
- final GridDhtLocalPartition part
+ final GridDhtLocalPartition part,
+ AtomicBoolean cpFlag
) {
- return ForkJoinPool.commonPool().submit(new
Callable<Map<PartitionKeyV2, PartitionHashRecordV2>>() {
- @Override public Map<PartitionKeyV2, PartitionHashRecordV2>
call() throws Exception {
- return calculatePartitionHash(grpCtx, part);
- }
- });
+ return ForkJoinPool.commonPool().submit(() ->
calculatePartitionHash(grpCtx, part, cpFlag));
}
/**
* @param grpCtx Group context.
* @param part Local partition.
+ * @param cpFlag Checkpoint flag.
*/
private Map<PartitionKeyV2, PartitionHashRecordV2>
calculatePartitionHash(
CacheGroupContext grpCtx,
- GridDhtLocalPartition part
+ GridDhtLocalPartition part,
+ AtomicBoolean cpFlag
) {
if (!part.reserve())
return Collections.emptyMap();
@@ -447,6 +545,9 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
partSize = part.dataStore().fullSize();
+ if (arg.isCheckCrc())
+ checkPartitionCrc(grpCtx, part, cpFlag);
+
GridIterator<CacheDataRow> it =
grpCtx.offheap().partitionIterator(part.id());
while (it.hasNextX()) {
@@ -460,7 +561,7 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
long updateCntrAfter = part.updateCounter();
if (updateCntrBefore != updateCntrAfter) {
- throw new IgniteException("Cluster is not idle: update
counter of partition [grpId=" +
+ throw new GridNotIdleException("Update counter of
partition [grpId=" +
grpCtx.groupId() + ", partId=" + part.id() + "]
changed during hash calculation [before=" +
updateCntrBefore + ", after=" + updateCntrAfter + "]");
}
@@ -469,18 +570,59 @@ public class VerifyBackupPartitionsTaskV2 extends
ComputeTaskAdapter<VisorIdleVe
U.error(log, "Can't calculate partition hash [grpId=" +
grpCtx.groupId() +
", partId=" + part.id() + "]", e);
- return Collections.emptyMap();
+ throw new IgniteException("Can't calculate partition hash
[grpId=" + grpCtx.groupId() +
+ ", partId=" + part.id() + "]", e);
}
finally {
part.release();
}
- PartitionHashRecordV2 partRec = new PartitionHashRecordV2(
- partKey, isPrimary, consId, partHash, updateCntrBefore,
partSize);
+ PartitionHashRecordV2 partRec =
+ new PartitionHashRecordV2(partKey, isPrimary, consId,
partHash, updateCntrBefore, partSize);
completionCntr.incrementAndGet();
return Collections.singletonMap(partKey, partRec);
}
+
+ /**
+ * Checks correct CRC sum for given partition and cache group.
+ *
+ * @param grpCtx Cache group context
+ * @param part partition.
+ * @param cpFlag Checkpoint flag.
+ */
+ private void checkPartitionCrc(CacheGroupContext grpCtx,
GridDhtLocalPartition part, AtomicBoolean cpFlag) {
+ if (grpCtx.persistenceEnabled()) {
+ FilePageStore pageStore = null;
+
+ try {
+ FilePageStoreManager pageStoreMgr =
+
(FilePageStoreManager)ignite.context().cache().context().pageStore();
+
+ if (pageStoreMgr == null)
+ return;
+
+ pageStore =
(FilePageStore)pageStoreMgr.getStore(grpCtx.groupId(), part.id());
+
+ IdleVerifyUtility.checkPartitionsPageCrcSum(pageStore,
grpCtx, part.id(), FLAG_DATA, cpFlag);
+ }
+ catch (GridNotIdleException e) {
+ throw e;
+ }
+ catch (Exception | AssertionError e) {
+ if (cpFlag.get())
+ throw new GridNotIdleException("Checkpoint with dirty
pages started! Cluster not idle!", e);
+
+ String msg = new SB("CRC check of partition:
").a(part.id()).a(", for cache group ")
+ .a(grpCtx.cacheOrGroupName()).a(" failed.")
+ .a(pageStore != null ? " file: " +
pageStore.getFileAbsolutePath() : "").toString();
+
+ log.error(msg, e);
+
+ throw new IgniteException(msg, e);
+ }
+ }
+ }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTaskArg.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTaskArg.java
index 3c836a4..8b92fbf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTaskArg.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTaskArg.java
@@ -48,9 +48,16 @@ public class VisorIdleVerifyDumpTaskArg extends
VisorIdleVerifyTaskArg {
* @param excludeCaches Caches to exclude.
* @param skipZeros Skip zeros partitions.
* @param cacheFilterEnum Cache kind.
+ * @param checkCrc Check partition crc sum.
*/
- public VisorIdleVerifyDumpTaskArg(Set<String> caches, Set<String>
excludeCaches, boolean skipZeros, CacheFilterEnum cacheFilterEnum) {
- super(caches, excludeCaches);
+ public VisorIdleVerifyDumpTaskArg(
+ Set<String> caches,
+ Set<String> excludeCaches,
+ boolean skipZeros,
+ CacheFilterEnum cacheFilterEnum,
+ boolean checkCrc
+ ) {
+ super(caches, excludeCaches, checkCrc);
this.skipZeros = skipZeros;
this.cacheFilterEnum = cacheFilterEnum;
}
@@ -75,28 +82,65 @@ public class VisorIdleVerifyDumpTaskArg extends
VisorIdleVerifyTaskArg {
out.writeBoolean(skipZeros);
- U.writeEnum(out, cacheFilterEnum);
+ /**
+ * Since protocol version 2 we must save class instance new fields to
end of output object. It's needs for
+ * support backward compatibility in extended (child) classes.
+ *
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-10932 Will
remove in 3.0
+ */
+ if (instanceOfCurrentClass()) {
+ U.writeEnum(out, cacheFilterEnum);
+
+ U.writeCollection(out, getExcludeCaches());
+
+ out.writeBoolean(isCheckCrc());
+ }
}
/** {@inheritDoc} */
- @Override protected void readExternalData(byte protoVer, ObjectInput in)
throws IOException, ClassNotFoundException {
+ @Override protected void readExternalData(
+ byte protoVer,
+ ObjectInput in
+ ) throws IOException, ClassNotFoundException {
super.readExternalData(protoVer, in);
skipZeros = in.readBoolean();
- if (protoVer >= V2)
- cacheFilterEnum = CacheFilterEnum.fromOrdinal(in.readByte());
- else
- cacheFilterEnum = CacheFilterEnum.ALL;
+ /**
+ * Since protocol version 2 we must read class instance new fields
from end of input object. It's needs for
+ * support backward compatibility in extended (child) classes.
+ *
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-10932 Will
remove in 3.0
+ */
+ if (instanceOfCurrentClass()) {
+ if (protoVer >= V2)
+ cacheFilterEnum = CacheFilterEnum.fromOrdinal(in.readByte());
+ else
+ cacheFilterEnum = CacheFilterEnum.ALL;
+
+ if (protoVer >= V2)
+ excludeCaches(U.readSet(in));
+
+ if (protoVer >= V3)
+ checkCrc(in.readBoolean());
+ }
}
/** {@inheritDoc} */
@Override public byte getProtocolVersion() {
- return V2;
+ return (byte)Math.max(V2, super.getProtocolVersion());
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(VisorIdleVerifyDumpTaskArg.class, this);
}
+
+ /**
+ * @return {@code True} if current instance is a instance of current class
(not a child class) and {@code False} if
+ * current instance is a instance of extented class (i.e child class).
+ */
+ private boolean instanceOfCurrentClass() {
+ return VisorIdleVerifyDumpTaskArg.class == getClass();
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
index a8dc697..d4652cc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
@@ -49,8 +49,11 @@ class VisorIdleVerifyJob<ResultT> extends
VisorJob<VisorIdleVerifyTaskArg, Resul
* @param debug Debug.
* @param taskCls Task class for execution.
*/
- VisorIdleVerifyJob(VisorIdleVerifyTaskArg arg, boolean debug,
- Class<? extends ComputeTask<VisorIdleVerifyTaskArg, ResultT>> taskCls)
{
+ VisorIdleVerifyJob(
+ VisorIdleVerifyTaskArg arg,
+ boolean debug,
+ Class<? extends ComputeTask<VisorIdleVerifyTaskArg, ResultT>> taskCls
+ ) {
super(arg, debug);
this.taskCls = taskCls;
}
@@ -63,11 +66,7 @@ class VisorIdleVerifyJob<ResultT> extends
VisorJob<VisorIdleVerifyTaskArg, Resul
if (!fut.isDone()) {
jobCtx.holdcc();
- fut.listen(new IgniteInClosure<IgniteFuture<ResultT>>() {
- @Override public void apply(IgniteFuture<ResultT> f) {
- jobCtx.callcc();
- }
- });
+ fut.listen((IgniteInClosure<IgniteFuture<ResultT>>)f ->
jobCtx.callcc());
return null;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyTaskArg.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyTaskArg.java
index d645fec..2bcf4bf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyTaskArg.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyTaskArg.java
@@ -26,7 +26,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorDataTransferObject;
/**
- * Arguments for task {@link VisorIdleVerifyTask}
+ * Arguments for task {@link VisorIdleVerifyTask}.
+ * <br/>
*/
public class VisorIdleVerifyTaskArg extends VisorDataTransferObject {
/** */
@@ -38,6 +39,9 @@ public class VisorIdleVerifyTaskArg extends
VisorDataTransferObject {
/** Exclude caches or groups. */
private Set<String> excludeCaches;
+ /** Check CRC */
+ private boolean checkCrc;
+
/**
* Default constructor.
*/
@@ -48,10 +52,21 @@ public class VisorIdleVerifyTaskArg extends
VisorDataTransferObject {
/**
* @param caches Caches.
* @param excludeCaches Exclude caches or group.
+ * @param checkCrc Check CRC sum on stored pages on disk.
*/
- public VisorIdleVerifyTaskArg(Set<String> caches, Set<String>
excludeCaches) {
+ public VisorIdleVerifyTaskArg(Set<String> caches, Set<String>
excludeCaches, boolean checkCrc) {
this.caches = caches;
this.excludeCaches = excludeCaches;
+ this.checkCrc = checkCrc;
+ }
+
+ /**
+ * @param caches Caches.
+ * @param checkCrc Check CRC sum on stored pages on disk.
+ */
+ public VisorIdleVerifyTaskArg(Set<String> caches, boolean checkCrc) {
+ this.caches = caches;
+ this.checkCrc = checkCrc;
}
/**
@@ -59,9 +74,12 @@ public class VisorIdleVerifyTaskArg extends
VisorDataTransferObject {
*/
public VisorIdleVerifyTaskArg(Set<String> caches) {
this.caches = caches;
- this.excludeCaches = excludeCaches;
}
+ /** */
+ public boolean isCheckCrc() {
+ return checkCrc;
+ }
/**
* @return Caches.
@@ -73,31 +91,74 @@ public class VisorIdleVerifyTaskArg extends
VisorDataTransferObject {
/**
* @return Exclude caches or groups.
*/
- public Set<String> excludeCaches() {
+ public Set<String> getExcludeCaches() {
return excludeCaches;
}
/** {@inheritDoc} */
@Override public byte getProtocolVersion() {
- return V2;
+ return V3;
}
/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws
IOException {
U.writeCollection(out, caches);
- U.writeCollection(out, excludeCaches);
+
+ /**
+ * Instance fields since protocol version 2 must be serialized if, and
only if class instance isn't child of
+ * current class. Otherwise, these fields must be serialized in child
class.
+ *
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-10932 Will
remove in 3.0
+ */
+ if (instanceOfCurrentClass()) {
+ U.writeCollection(out, excludeCaches);
+
+ out.writeBoolean(checkCrc);
+ }
}
/** {@inheritDoc} */
- @Override protected void readExternalData(byte protoVer, ObjectInput in)
throws IOException, ClassNotFoundException {
+ @Override protected void readExternalData(
+ byte protoVer,
+ ObjectInput in
+ ) throws IOException, ClassNotFoundException {
caches = U.readSet(in);
- if (protoVer >= V2)
- excludeCaches = U.readSet(in);
+ /**
+ * Instance fields since protocol version 2 must be deserialized if,
and only if class instance isn't child of
+ * current class. Otherwise, these fields must be deserialized in
child class.
+ *
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-10932 Will
remove in 3.0
+ */
+ if (instanceOfCurrentClass()) {
+ if (protoVer >= V2)
+ excludeCaches = U.readSet(in);
+
+ if (protoVer >= V3)
+ checkCrc = in.readBoolean();
+ }
+ }
+
+ /** */
+ protected void excludeCaches(Set<String> excludeCaches) {
+ this.excludeCaches = excludeCaches;
+ }
+
+ /** */
+ protected void checkCrc(boolean checkCrc) {
+ this.checkCrc = checkCrc;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(VisorIdleVerifyTaskArg.class, this);
}
+
+ /**
+ * @return {@code True} if current instance is a instance of current class
(not a child class) and {@code False} if
+ * current instance is a instance of extented class (i.e child class).
+ */
+ private boolean instanceOfCurrentClass() {
+ return VisorIdleVerifyTaskArg.class == getClass();
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyTaskV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyTaskV2.java
index b9250ef..6deaf07 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyTaskV2.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyTaskV2.java
@@ -17,18 +17,11 @@
package org.apache.ignite.internal.visor.verify;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import
org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorOneNodeTask;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.resources.JobContextResource;
/**
* Task to verify checksums of backup partitions.
@@ -40,55 +33,6 @@ public class VisorIdleVerifyTaskV2 extends
VisorOneNodeTask<VisorIdleVerifyTaskA
/** {@inheritDoc} */
@Override protected VisorJob<VisorIdleVerifyTaskArg, IdleVerifyResultV2>
job(VisorIdleVerifyTaskArg arg) {
- return new VisorIdleVerifyJobV2(arg, debug);
- }
-
- /**
- *
- */
- private static class VisorIdleVerifyJobV2 extends
VisorJob<VisorIdleVerifyTaskArg, IdleVerifyResultV2> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private ComputeTaskFuture<IdleVerifyResultV2> fut;
-
- /** Auto-inject job context. */
- @JobContextResource
- protected transient ComputeJobContext jobCtx;
-
- /**
- * @param arg Argument.
- * @param debug Debug.
- */
- private VisorIdleVerifyJobV2(VisorIdleVerifyTaskArg arg, boolean
debug) {
- super(arg, debug);
- }
-
- /** {@inheritDoc} */
- @Override protected IdleVerifyResultV2 run(VisorIdleVerifyTaskArg arg)
throws IgniteException {
- if (fut == null) {
- fut =
ignite.compute().executeAsync(VerifyBackupPartitionsTaskV2.class, arg);
-
- if (!fut.isDone()) {
- jobCtx.holdcc();
-
- fut.listen(new
IgniteInClosure<IgniteFuture<IdleVerifyResultV2>>() {
- @Override public void
apply(IgniteFuture<IdleVerifyResultV2> f) {
- jobCtx.callcc();
- }
- });
-
- return null;
- }
- }
-
- return fut.get();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(VisorIdleVerifyJobV2.class, this);
- }
+ return new VisorIdleVerifyJob<>(arg, debug,
VerifyBackupPartitionsTaskV2.class);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
index 6944e21..6e29340 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
@@ -185,7 +185,8 @@ public class PageMemoryTracker implements IgnitePlugin {
cleanupPages(fullPageId -> fullPageId.groupId() ==
grp.groupId());
}
- @Override public void onPartitionDestroyed(int grpId, int
partId, int tag) throws IgniteCheckedException {
+ @Override
+ public void onPartitionDestroyed(int grpId, int partId, int
tag) throws IgniteCheckedException {
super.onPartitionDestroyed(grpId, partId, tag);
cleanupPages(fullPageId -> fullPageId.groupId() == grpId
@@ -212,7 +213,7 @@ public class PageMemoryTracker implements IgnitePlugin {
Mockito.doReturn(pageSize).when(pageMemoryMock).pageSize();
Mockito.when(pageMemoryMock.realPageSize(Mockito.anyInt())).then(mock
-> {
- int grpId = (Integer) mock.getArguments()[0];
+ int grpId = (Integer)mock.getArguments()[0];
if (gridCtx.encryption().groupKey(grpId) == null)
return pageSize;
@@ -247,9 +248,15 @@ public class PageMemoryTracker implements IgnitePlugin {
freeSlotsCnt = maxPages;
if (cfg.isCheckPagesOnCheckpoint()) {
- checkpointLsnr = ctx -> {
- if (!checkPages(false))
- throw new IgniteCheckedException("Page memory is
inconsistent after applying WAL delta records.");
+ checkpointLsnr = new DbCheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx)
throws IgniteCheckedException {
+ if (!checkPages(false))
+ throw new IgniteCheckedException("Page memory is
inconsistent after applying WAL delta records.");
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) throws
IgniteCheckedException {
+ /* No-op. */
+ }
};
((GridCacheDatabaseSharedManager)gridCtx.cache().context().database()).addCheckpointListener(checkpointLsnr);
@@ -298,7 +305,6 @@ public class PageMemoryTracker implements IgnitePlugin {
return (cfg != null && cfg.isEnabled() &&
CU.isPersistenceEnabled(ctx.igniteConfiguration()));
}
-
/**
* Cleanup pages by predicate.
*
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index e1306f5..833a71e 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -89,6 +89,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.GridBusyLock;
@@ -2131,4 +2132,14 @@ public final class GridTestUtils {
return DriverManager.getConnection(connStr);
}
+
+ /**
+ * Removes idle_verify log files created in tests.
+ */
+ public static void cleanIdleVerifyLogFiles() {
+ File dir = new File(".");
+
+ for (File f : dir.listFiles(n ->
n.getName().startsWith(IdleVerifyResultV2.IDLE_VERIFY_FILE_PREFIX)))
+ f.delete();
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 30a532f..29d40d2 100644
---
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -21,6 +21,8 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -36,6 +38,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
@@ -151,6 +154,13 @@ public class GridCommandHandlerTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ GridTestUtils.cleanIdleVerifyLogFiles();
+ }
+
+ /** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
System.setProperty(IGNITE_ENABLE_EXPERIMENTAL_COMMAND, "true");
@@ -160,7 +170,7 @@ public class GridCommandHandlerTest extends
GridCommonAbstractTest {
sysOut = System.out;
- testOut = new ByteArrayOutputStream(128 * 1024);
+ testOut = new ByteArrayOutputStream(1024 * 1024);
}
/** {@inheritDoc} */
@@ -1223,6 +1233,83 @@ public class GridCommandHandlerTest extends
GridCommonAbstractTest {
checkExceptionMessageOnReport(unstableId);
}
+ /** */
+ @Test
+ public void testCacheIdleVerifyCrcWithCorruptedPartition() throws
Exception {
+ testCacheIdleVerifyWithCorruptedPartition("--cache", "idle_verify",
"--check-crc");
+
+ String out = testOut.toString();
+
+ assertTrue(out.contains("idle_verify failed on 1 node."));
+ assertTrue(out.contains("See log for additional information."));
+ }
+
+ /** */
+ @Test
+ public void testCacheIdleVerifyDumpCrcWithCorruptedPartition() throws
Exception {
+ testCacheIdleVerifyWithCorruptedPartition("--cache", "idle_verify",
"--dump", "--check-crc");
+
+ String parts[] = testOut.toString().split("VisorIdleVerifyDumpTask
successfully written output to '");
+
+ assertEquals(2, parts.length);
+
+ String dumpFile = parts[1].split("\\.")[0] + ".txt";
+
+ for (String line : Files.readAllLines(new File(dumpFile).toPath()))
+ System.out.println(line);
+
+ String outputStr = testOut.toString();
+
+ assertTrue(outputStr.contains("idle_verify failed on 1 node."));
+ assertTrue(outputStr.contains("idle_verify check has finished, no
conflicts have been found."));
+ }
+
+ /** */
+ private void corruptPartition(File partitionsDir) throws IOException {
+ ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+ for(File partFile : partitionsDir.listFiles((d, n) ->
n.startsWith("part"))) {
+ try (RandomAccessFile raf = new RandomAccessFile(partFile, "rw")) {
+ byte[] buf = new byte[1024];
+
+ rand.nextBytes(buf);
+
+ raf.seek(4096 * 2 + 1);
+
+ raf.write(buf);
+ }
+ }
+ }
+
+ /** */
+ private void testCacheIdleVerifyWithCorruptedPartition(String... args)
throws Exception {
+ Ignite ignite = startGrids(2);
+
+ ignite.cluster().active(true);
+
+ createCacheAndPreload(ignite, 1000);
+
+ Serializable consistId = ignite.configuration().getConsistentId();
+
+ File partitionsDir = U.resolveWorkDirectory(
+ ignite.configuration().getWorkDirectory(),
+ "db/" + consistId + "/cache-" + DEFAULT_CACHE_NAME,
+ false
+ );
+
+ stopGrid(0);
+
+ corruptPartition(partitionsDir);
+
+ startGrid(0);
+
+ awaitPartitionMapExchange();
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_OK, execute(args));
+ }
+
/**
* Creates default cache and preload some data entries.
*
@@ -1253,9 +1340,7 @@ public class GridCommandHandlerTest extends
GridCommonAbstractTest {
assertTrue(dumpWithConflicts.contains("Idle verify failed on
nodes:"));
- assertTrue(dumpWithConflicts.contains("Node ID: " + unstableNodeId
+ "\n" +
- "Exception message:\n" +
- "Node has left grid: " + unstableNodeId));
+ assertTrue(dumpWithConflicts.contains("Node ID: " +
unstableNodeId));
}
else
fail("Should be found dump with conflicts");
@@ -1416,7 +1501,7 @@ public class GridCommandHandlerTest extends
GridCommonAbstractTest {
injectTestSystemOut();
- assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump",
"--excludeCaches", "shared_grp"));
+ assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump",
"--exclude-caches", "shared_grp"));
Matcher fileNameMatcher = dumpFileNameMatcher();
@@ -1461,7 +1546,7 @@ public class GridCommandHandlerTest extends
GridCommonAbstractTest {
injectTestSystemOut();
- assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump",
"--excludeCaches", DEFAULT_CACHE_NAME
+ assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump",
"--exclude-caches", DEFAULT_CACHE_NAME
+ "," + DEFAULT_CACHE_NAME + "_second"));
Matcher fileNameMatcher = dumpFileNameMatcher();
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
index 4b68a42..9f2cda1 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
@@ -18,8 +18,6 @@ package org.apache.ignite.internal.visor.verify;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -33,6 +31,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
@@ -41,9 +40,6 @@ import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
-import org.apache.ignite.internal.pagemem.PageIdUtils;
-import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -55,7 +51,12 @@ import
org.apache.ignite.internal.processors.cache.KeyCacheObject;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.verify.GridNotIdleException;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility;
import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
@@ -76,6 +77,9 @@ import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.index.Index;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+
/**
* Closure that locally validates indexes of given caches.
* Validation consists of three checks:
@@ -154,7 +158,7 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
/**
*
*/
- private VisorValidateIndexesJobResult call0() throws Exception {
+ private VisorValidateIndexesJobResult call0() {
Set<Integer> grpIds = new HashSet<>();
Set<String> missingCaches = new HashSet<>();
@@ -295,31 +299,56 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
private Map<Integer, IndexIntegrityCheckIssue>
integrityCheckIndexesPartitions(Set<Integer> grpIds) {
List<Future<T2<Integer, IndexIntegrityCheckIssue>>>
integrityCheckFutures = new ArrayList<>(grpIds.size());
- for (Integer grpId: grpIds) {
- final CacheGroupContext grpCtx =
ignite.context().cache().cacheGroup(grpId);
+ Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = new
HashMap<>();
- if (grpCtx == null || !grpCtx.persistenceEnabled()) {
- integrityCheckedIndexes.incrementAndGet();
+ int curFut = 0;
- continue;
+ IgniteCacheDatabaseSharedManager db =
ignite.context().cache().context().database();
+
+ DbCheckpointListener lsnr = null;
+
+ try {
+ AtomicBoolean cpFlag = new AtomicBoolean();
+
+ if (db instanceof GridCacheDatabaseSharedManager) {
+ lsnr = new DbCheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ /* No-op. */
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) {
+ if (ctx.hasPages())
+ cpFlag.set(true);
+ }
+ };
+
+
((GridCacheDatabaseSharedManager)db).addCheckpointListener(lsnr);
+
+ if (IdleVerifyUtility.isCheckpointNow(db))
+ throw new
GridNotIdleException(IdleVerifyUtility.CLUSTER_NOT_IDLE_MSG);
}
- Future<T2<Integer, IndexIntegrityCheckIssue>> checkFut =
- calcExecutor.submit(new Callable<T2<Integer,
IndexIntegrityCheckIssue>>() {
- @Override public T2<Integer, IndexIntegrityCheckIssue>
call() throws Exception {
- IndexIntegrityCheckIssue issue =
integrityCheckIndexPartition(grpCtx);
+ for (Integer grpId: grpIds) {
+ final CacheGroupContext grpCtx =
ignite.context().cache().cacheGroup(grpId);
- return new T2<>(grpCtx.groupId(), issue);
- }
- });
+ if (grpCtx == null || !grpCtx.persistenceEnabled()) {
+ integrityCheckedIndexes.incrementAndGet();
- integrityCheckFutures.add(checkFut);
- }
+ continue;
+ }
- Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = new
HashMap<>();
+ Future<T2<Integer, IndexIntegrityCheckIssue>> checkFut =
+ calcExecutor.submit(new Callable<T2<Integer,
IndexIntegrityCheckIssue>>() {
+ @Override public T2<Integer,
IndexIntegrityCheckIssue> call() throws Exception {
+ IndexIntegrityCheckIssue issue =
integrityCheckIndexPartition(grpCtx, cpFlag);
+
+ return new T2<>(grpCtx.groupId(), issue);
+ }
+ });
+
+ integrityCheckFutures.add(checkFut);
+ }
- int curFut = 0;
- try {
for (Future<T2<Integer, IndexIntegrityCheckIssue>> fut :
integrityCheckFutures) {
T2<Integer, IndexIntegrityCheckIssue> res = fut.get();
@@ -333,42 +362,33 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
throw unwrapFutureException(e);
}
+ finally {
+ if (db instanceof GridCacheDatabaseSharedManager && lsnr != null)
+
((GridCacheDatabaseSharedManager)db).removeCheckpointListener(lsnr);
+ }
return integrityCheckResults;
}
/**
* @param gctx Cache group context.
+ * @param cpFlag Checkpoint status flag.
*/
- private IndexIntegrityCheckIssue
integrityCheckIndexPartition(CacheGroupContext gctx) {
+ private IndexIntegrityCheckIssue
integrityCheckIndexPartition(CacheGroupContext gctx, AtomicBoolean cpFlag) {
GridKernalContext ctx = ignite.context();
GridCacheSharedContext cctx = ctx.cache().context();
try {
FilePageStoreManager pageStoreMgr =
(FilePageStoreManager)cctx.pageStore();
- if (pageStoreMgr == null)
- return null;
-
- int pageSz = gctx.dataRegion().pageMemory().pageSize();
-
- PageStore pageStore = pageStoreMgr.getStore(gctx.groupId(),
PageIdAllocator.INDEX_PARTITION);
-
- long pageId = PageIdUtils.pageId(PageIdAllocator.INDEX_PARTITION,
PageIdAllocator.FLAG_IDX, 0);
-
- ByteBuffer buf = ByteBuffer.allocateDirect(pageSz);
-
- buf.order(ByteOrder.nativeOrder());
-
- for (int pageNo = 0; pageNo < pageStore.pages(); pageId++,
pageNo++) {
- buf.clear();
-
- pageStore.read(pageId, buf, true);
- }
+ IdleVerifyUtility.checkPartitionsPageCrcSum(pageStoreMgr, gctx,
INDEX_PARTITION, FLAG_IDX, cpFlag);
return null;
}
catch (Throwable t) {
+ if (cpFlag.get())
+ throw new GridNotIdleException("Checkpoint with dirty pages
started! Cluster not idle!", t);
+
log.error("Integrity check of index partition of cache group " +
gctx.cacheOrGroupName() + " failed", t);
return new IndexIntegrityCheckIssue(gctx.cacheOrGroupName(), t);
@@ -728,5 +748,4 @@ public class ValidateIndexesClosure implements
IgniteCallable<VisorValidateIndex
else
return new IgniteException(e.getCause());
}
-
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgnitePersistentStoreSchemaLoadTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgnitePersistentStoreSchemaLoadTest.java
index 3555cef..a9168f3 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgnitePersistentStoreSchemaLoadTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgnitePersistentStoreSchemaLoadTest.java
@@ -254,9 +254,13 @@ public class IgnitePersistentStoreSchemaLoadTest extends
GridCommonAbstractTest
GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)node.context().cache().context().database();
db.addCheckpointListener(new DbCheckpointListener() {
- @Override public void onCheckpointBegin(Context ctx) {
+ @Override public void onMarkCheckpointBegin(Context ctx) {
cnt.countDown();
}
+
+ @Override public void onCheckpointBegin(Context ctx) {
+ /* No-op. */
+ }
});
return cnt;