This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new baf2c91 IGNITE-15328 Consistency recovery command (Read Repair via
control.ch) should support cancellation (#9409)
baf2c91 is described below
commit baf2c9143d3b0e40e2440baf6b4f99f4bd3041d0
Author: Anton Vinogradov <[email protected]>
AuthorDate: Mon Sep 20 12:09:36 2021 +0300
IGNITE-15328 Consistency recovery command (Read Repair via control.ch)
should support cancellation (#9409)
---
docs/_docs/sql-reference/operational-commands.adoc | 24 ++++-
.../consistency/ConsistencyCommand.java | 34 ++++++-
.../internal/commandline/query/KillCommand.java | 30 +++++-
.../internal/commandline/query/KillSubcommand.java | 3 +
.../util/GridCommandHandlerAbstractTest.java | 3 +
.../util/GridCommandHandlerConsistencyTest.java | 20 +++-
.../ignite/util/KillCommandsCommandShTest.java | 107 +++++++++++++++++++++
.../internal/processors/job/GridJobWorker.java | 6 +-
.../consistency/VisorConsistencyCancelTask.java | 81 ++++++++++++++++
.../consistency/VisorConsistencyRepairTask.java | 57 ++++++++---
.../VisorConsistencyRepairTaskResult.java | 102 ++++++++++++++++++++
.../main/resources/META-INF/classnames.properties | 2 +
12 files changed, 447 insertions(+), 22 deletions(-)
diff --git a/docs/_docs/sql-reference/operational-commands.adoc
b/docs/_docs/sql-reference/operational-commands.adoc
index be7223f..d1f4641 100644
--- a/docs/_docs/sql-reference/operational-commands.adoc
+++ b/docs/_docs/sql-reference/operational-commands.adoc
@@ -334,7 +334,7 @@ KILL CONTINUOUS
'6fa749ee-7cf8-4635-be10-36a1c75267a7_54321' '6fa749ee-7cf8-4635
== KILL SERVICE
-The `KILL SERVICE` command allows you to cance a running service.
+The `KILL SERVICE` command allows you to cancel a running service.
[tabs]
--
@@ -370,3 +370,25 @@ control.bat --kill SERVICE name
* `name` - corresponds to the name you selected for the service upon the
deployment time.
You can always find it with the
link:monitoring-metrics/system-views#services[SERVICES] view.
+
+
+== KILL CONSISTENCY repair/check operations
+
+The `KILL CONSISTENCY` command allows you to cancel all running consistency
repair/check operations.
+
+[tabs]
+--
+
+tab:Unix[]
+[source,bash]
+----
+./control.sh --kill CONSISTENCY
+----
+
+tab:Windows[]
+[source,bash]
+----
+control.bat --kill CONSISTENCY
+----
+
+--
diff --git
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java
index a1f56c4..10ef23a 100644
---
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java
+++
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.commandline.consistency;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.logging.Logger;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientConfiguration;
import org.apache.ignite.internal.commandline.AbstractCommand;
@@ -27,6 +28,7 @@ import org.apache.ignite.internal.commandline.Command;
import org.apache.ignite.internal.commandline.CommandArgIterator;
import org.apache.ignite.internal.commandline.CommandLogger;
import
org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskArg;
+import
org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskResult;
import static org.apache.ignite.internal.commandline.CommandList.CONSISTENCY;
import static
org.apache.ignite.internal.commandline.TaskExecutor.BROADCAST_UUID;
@@ -46,8 +48,12 @@ public class ConsistencyCommand extends
AbstractCommand<VisorConsistencyRepairTa
/** {@inheritDoc} */
@Override public Object execute(GridClientConfiguration clientCfg, Logger
log) throws Exception {
+ boolean failed = false;
+
+ StringBuilder sb = new StringBuilder();
+
try (GridClient client = Command.startClient(clientCfg)) {
- Object res = executeTaskByNameOnNode(
+ VisorConsistencyRepairTaskResult res = executeTaskByNameOnNode(
client,
cmd.taskName(),
arg(),
@@ -55,9 +61,22 @@ public class ConsistencyCommand extends
AbstractCommand<VisorConsistencyRepairTa
clientCfg
);
- log.info(String.valueOf(res));
+ if (res.cancelled()) {
+ sb.append("Operation execution cancelled.\n\n");
+
+ failed = true;
+ }
+
+ if (res.failed()) {
+ sb.append("Operation execution failed.\n\n");
+
+ failed = true;
+ }
- return res;
+ if (failed)
+ sb.append("[EXECUTION FAILED OR CANCELLED, RESULTS MAY BE
INCOMPLETE OR INCONSISTENT]\n\n");
+
+ sb.append(res.message());
}
catch (Throwable e) {
log.severe("Failed to perform operation.");
@@ -65,6 +84,15 @@ public class ConsistencyCommand extends
AbstractCommand<VisorConsistencyRepairTa
throw e;
}
+
+ String output = sb.toString();
+
+ if (failed)
+ throw new IgniteCheckedException(output);
+ else
+ log.info(output);
+
+ return output;
}
/** {@inheritDoc} */
diff --git
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java
index 7682113..12926aa 100644
---
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java
+++
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.commandline.CommandLogger;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.visor.compute.VisorComputeCancelSessionTask;
import
org.apache.ignite.internal.visor.compute.VisorComputeCancelSessionTaskArg;
+import org.apache.ignite.internal.visor.consistency.VisorConsistencyCancelTask;
import org.apache.ignite.internal.visor.query.VisorContinuousQueryCancelTask;
import
org.apache.ignite.internal.visor.query.VisorContinuousQueryCancelTaskArg;
import org.apache.ignite.internal.visor.query.VisorQueryCancelOnInitiatorTask;
@@ -51,6 +52,7 @@ import org.apache.ignite.mxbean.TransactionsMXBean;
import static java.util.Collections.singletonMap;
import static
org.apache.ignite.internal.QueryMXBeanImpl.EXPECTED_GLOBAL_QRY_ID_FORMAT;
import static org.apache.ignite.internal.commandline.CommandList.KILL;
+import static
org.apache.ignite.internal.commandline.TaskExecutor.BROADCAST_UUID;
import static
org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;
import static
org.apache.ignite.internal.commandline.query.KillSubcommand.COMPUTE;
import static
org.apache.ignite.internal.commandline.query.KillSubcommand.CONTINUOUS;
@@ -77,6 +79,9 @@ public class KillCommand extends AbstractCommand<Object> {
/** Task name. */
private String taskName;
+ /** Node id. */
+ private UUID nodeId;
+
/** {@inheritDoc} */
@Override public Object execute(GridClientConfiguration clientCfg, Logger
log) throws Exception {
try (GridClient client = Command.startClient(clientCfg)) {
@@ -84,7 +89,7 @@ public class KillCommand extends AbstractCommand<Object> {
client,
taskName,
taskArgs,
- null,
+ nodeId,
clientCfg
);
}
@@ -119,6 +124,8 @@ public class KillCommand extends AbstractCommand<Object> {
taskName = VisorComputeCancelSessionTask.class.getName();
+ nodeId = null;
+
break;
case SERVICE:
@@ -126,6 +133,8 @@ public class KillCommand extends AbstractCommand<Object> {
taskName = VisorCancelServiceTask.class.getName();
+ nodeId = null;
+
break;
case TRANSACTION:
@@ -136,6 +145,8 @@ public class KillCommand extends AbstractCommand<Object> {
taskName = VisorTxTask.class.getName();
+ nodeId = null;
+
break;
case SQL:
@@ -148,6 +159,8 @@ public class KillCommand extends AbstractCommand<Object> {
taskName = VisorQueryCancelOnInitiatorTask.class.getName();
+ nodeId = null;
+
break;
case SCAN:
@@ -163,6 +176,8 @@ public class KillCommand extends AbstractCommand<Object> {
taskName = VisorScanQueryCancelTask.class.getName();
+ nodeId = null;
+
break;
case CONTINUOUS:
@@ -172,6 +187,8 @@ public class KillCommand extends AbstractCommand<Object> {
taskName = VisorContinuousQueryCancelTask.class.getName();
+ nodeId = null;
+
break;
case SNAPSHOT:
@@ -179,6 +196,17 @@ public class KillCommand extends AbstractCommand<Object> {
taskName = VisorSnapshotCancelTask.class.getName();
+ nodeId = null;
+
+ break;
+
+ case CONSISTENCY:
+ taskName = VisorConsistencyCancelTask.class.getName();
+
+ taskArgs = null;
+
+ nodeId = BROADCAST_UUID;
+
break;
default:
diff --git
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java
index 311ab96..2c70166 100644
---
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java
+++
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java
@@ -54,4 +54,7 @@ public enum KillSubcommand {
/** Kill snapshot operation. */
SNAPSHOT,
+
+ /** Kill consistency tasks. */
+ CONSISTENCY,
}
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
index ce84c9a..4358765 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
@@ -73,6 +73,7 @@ import static
org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTA
import static
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CHECKPOINT_FREQ;
import static
org.apache.ignite.configuration.EncryptionConfiguration.DFLT_REENCRYPTION_BATCH_SIZE;
import static
org.apache.ignite.configuration.EncryptionConfiguration.DFLT_REENCRYPTION_RATE_MBPS;
+import static org.apache.ignite.events.EventType.EVT_CONSISTENCY_VIOLATION;
import static
org.apache.ignite.internal.encryption.AbstractEncryptionTest.KEYSTORE_PASSWORD;
import static
org.apache.ignite.internal.encryption.AbstractEncryptionTest.KEYSTORE_PATH;
import static
org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsDumpTask.IDLE_DUMP_FILE_PREFIX;
@@ -250,6 +251,8 @@ public abstract class GridCommandHandlerAbstractTest
extends GridCommonAbstractT
cfg.setDaemon(igniteInstanceName.startsWith(DAEMON_NODE_NAME_PREFIX));
+ cfg.setIncludeEventTypes(EVT_CONSISTENCY_VIOLATION); // Extend if
necessary.
+
if (encryptionEnabled) {
KeystoreEncryptionSpi encSpi = new KeystoreEncryptionSpi();
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
index 6643d9d..6002439 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
@@ -42,6 +42,7 @@ import static
org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.events.EventType.EVT_CONSISTENCY_VIOLATION;
import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
import static
org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.CONSISTENCY_VIOLATIONS_FOUND;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
@@ -171,11 +172,26 @@ public class GridCommandHandlerConsistencyTest extends
GridCommandHandlerCluster
/**
*
*/
+ @Test
+ public void testRepairNonExistentCache() throws Exception {
+ startGrids(3);
+
+ injectTestSystemOut();
+
+ for (int i = 0; i < PARTITIONS; i++) {
+ assertEquals(EXIT_CODE_UNEXPECTED_ERROR, execute("--consistency",
"repair", "non-existent", String.valueOf(i)));
+ assertContains(log, testOut.toString(), "Cache not found");
+ }
+ }
+
+ /**
+ *
+ */
private void readRepairTx(AtomicInteger brokenParts, String cacheName) {
for (int i = 0; i < PARTITIONS; i++) {
assertEquals(EXIT_CODE_OK, execute("--consistency", "repair",
cacheName, String.valueOf(i)));
assertContains(log, testOut.toString(),
CONSISTENCY_VIOLATIONS_FOUND);
- assertContains(log, testOut.toString(), "[found=1, fixed=1]");
+ assertContains(log, testOut.toString(), "[found=1, fixed=1");
assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
@@ -196,7 +212,7 @@ public class GridCommandHandlerConsistencyTest extends
GridCommandHandlerCluster
for (int i = 0; i < PARTITIONS; i++) { // This may be a copy of
previous (tx case), implement atomic repair to make this happen :)
assertEquals(EXIT_CODE_OK, execute("--consistency", "repair",
cacheName, String.valueOf(i)));
assertContains(log, testOut.toString(),
CONSISTENCY_VIOLATIONS_FOUND);
- assertContains(log, testOut.toString(), "[found=1, fixed=0]"); //
Nothing fixed.
+ assertContains(log, testOut.toString(), "[found=1, fixed=0"); //
Nothing fixed.
assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
assertContains(log, testOut.toString(),
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
index 7f6f8d9..6501359 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
@@ -20,15 +20,29 @@ package org.apache.ignite.util;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.systemview.view.ComputeJobView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotSelfTest.doSnapshotCancellationTest;
+import static
org.apache.ignite.internal.processors.job.GridJobProcessor.JOBS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.util.KillCommandsTests.PAGES_CNT;
import static org.apache.ignite.util.KillCommandsTests.PAGE_SZ;
import static org.apache.ignite.util.KillCommandsTests.doTestCancelComputeTask;
@@ -195,4 +209,97 @@ public class KillCommandsCommandShTest extends
GridCommandHandlerClusterByClassA
assertEquals(EXIT_CODE_OK, res);
}
+
+ /** */
+ @Test
+ public void testCancelConsistencyMissedTask() {
+ int res = execute("--kill", "consistency");
+
+ assertEquals(EXIT_CODE_OK, res);
+ }
+
+ /** */
+ @Test
+ public void testCancelConsistencyTask() throws InterruptedException {
+ String consistencyCancheName = "consistencyCache";
+
+ CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
+
+ cfg.setName(consistencyCancheName);
+ cfg.setBackups(SERVER_NODE_CNT - 1);
+
+ IgniteCache<Integer, Integer> cache = client.getOrCreateCache(cfg);
+
+ for (int i = 0; i < 10_000; i++)
+ cache.put(i, i);
+
+ AtomicInteger getCnt = new AtomicInteger();
+
+ CountDownLatch thLatch = new CountDownLatch(1);
+
+ Thread th = new Thread(() -> {
+ IgnitePredicate<ComputeJobView> repairJobFilter =
+ job ->
job.taskClassName().equals(VisorConsistencyRepairTask.class.getName());
+
+ for (IgniteEx node : srvs) {
+ SystemView<ComputeJobView> jobs =
node.context().systemView().view(JOBS_VIEW);
+
+ assertTrue(F.iterator0(jobs, true,
repairJobFilter).hasNext()); // Found.
+ }
+
+ int res = execute("--kill", "consistency");
+
+ assertEquals(EXIT_CODE_OK, res);
+
+ try {
+ assertTrue(GridTestUtils.waitForCondition(() -> {
+ for (IgniteEx node : srvs) {
+ SystemView<ComputeJobView> jobs =
node.context().systemView().view(JOBS_VIEW);
+
+ if (F.iterator0(jobs, true,
repairJobFilter).hasNext()) // Found.
+ return false;
+ }
+
+ return true;
+ }, 5000L)); // Missed.
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ fail();
+ }
+
+ thLatch.countDown();
+ });
+
+ for (IgniteEx server : srvs) {
+ TestRecordingCommunicationSpi spi =
+
((TestRecordingCommunicationSpi)server.configuration().getCommunicationSpi());
+
+ spi.blockMessages((node, message) -> {
+ if (message instanceof GridNearGetRequest) { // Get request
caused by read repair operation.
+ if (getCnt.incrementAndGet() == SERVER_NODE_CNT) // Each
node should send a get request.
+ th.start();
+
+ return true; // Blocking to freeze '--consistency repair'
operation.
+ }
+
+ return false;
+ });
+ }
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_UNEXPECTED_ERROR, execute("--consistency",
"repair", consistencyCancheName, "0"));
+
+ assertContains(log, testOut.toString(), "Operation execution
cancelled.");
+ assertContains(log, testOut.toString(), "violations were NOT found
[processed=0]");
+
+ thLatch.await();
+
+ for (IgniteEx server : srvs) { // Restoring messaging for other tests.
+ TestRecordingCommunicationSpi spi =
+
((TestRecordingCommunicationSpi)server.configuration().getCommunicationSpi());
+
+ spi.stopBlock();
+ }
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index f78be69..fa8d670 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -744,8 +744,6 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
*/
public void cancel(boolean sys) {
try {
- super.cancel();
-
final ComputeJob job0 = job;
if (sys)
@@ -764,6 +762,10 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
});
}
+ // Interrupting only when all 'cancelled' flags are set.
+ // This allows the 'job' to determine it's a cancellation.
+ super.cancel();
+
if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED))
recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyCancelTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyCancelTask.java
new file mode 100644
index 0000000..1287f8a
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyCancelTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.consistency;
+
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.systemview.view.ComputeJobView;
+
+import static
org.apache.ignite.internal.processors.job.GridJobProcessor.JOBS_VIEW;
+
+/**
+ * Cancels given consistency repairs on all cluster nodes.
+ */
+public class VisorConsistencyCancelTask extends VisorMultiNodeTask<Void, Void,
Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override protected VisorConsistencyCancelJob job(Void arg) {
+ return new VisorConsistencyCancelJob(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Void reduce0(List<ComputeJobResult> results) {
+ // No-op, just awaiting all jobs done.
+ return null;
+ }
+
+ /**
+ * Job that cancels the tasks.
+ */
+ private static class VisorConsistencyCancelJob extends VisorJob<Void,
Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Auto-injected grid instance.
+ */
+ @IgniteInstanceResource
+ private transient IgniteEx ignite;
+
+ /**
+ * Default constructor.
+ */
+ protected VisorConsistencyCancelJob(Void arg, boolean debug) {
+ super(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Void run(Void arg) throws IgniteException {
+ F.iterator(ignite.context().systemView().view(JOBS_VIEW),
+ ComputeJobView::sessionId,
+ true,
+ job ->
job.taskClassName().equals(VisorConsistencyRepairTask.class.getName())
+ ).forEach(sesId -> ignite.context().job().cancelJob(sesId, null,
false));
+
+ return null;
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
index d8070f5..a43a6b0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
@@ -31,11 +31,13 @@ import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.events.CacheConsistencyViolationEvent;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import
org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteConsistencyViolationException;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
import org.apache.ignite.lang.IgnitePredicate;
@@ -46,12 +48,13 @@ import static
org.apache.ignite.events.EventType.EVT_CONSISTENCY_VIOLATION;
/**
*
*/
-public class VisorConsistencyRepairTask extends
VisorMultiNodeTask<VisorConsistencyRepairTaskArg, String, String> {
+public class VisorConsistencyRepairTask extends
+ VisorMultiNodeTask<VisorConsistencyRepairTaskArg,
VisorConsistencyRepairTaskResult, String> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** Nothing found. */
- private static final String NOTHING_FOUND = "Consistency violations were
not found.";
+ private static final String NOTHING_FOUND = "Consistency violations were
NOT found";
/** Found. */
public static final String CONSISTENCY_VIOLATIONS_FOUND = "Consistency
violations were FOUND";
@@ -65,18 +68,34 @@ public class VisorConsistencyRepairTask extends
VisorMultiNodeTask<VisorConsiste
}
/** {@inheritDoc} */
- @Override protected String reduce0(List<ComputeJobResult> results) throws
IgniteException {
+ @Override protected VisorConsistencyRepairTaskResult
reduce0(List<ComputeJobResult> results) throws IgniteException {
+ VisorConsistencyRepairTaskResult taskRes = new
VisorConsistencyRepairTaskResult();
StringBuilder sb = new StringBuilder();
for (ComputeJobResult res : results) {
+ if (res.isCancelled())
+ taskRes.cancelled(true);
+
+ Exception e = res.getException();
+
+ if (e != null) {
+ taskRes.failed(true);
+
+ sb.append("Node: ").append(res.getNode()).append("\n")
+ .append(" Exception: ").append(e).append("\n")
+ .append(X.getFullStackTrace(e)).append("\n");
+ }
+
String data = res.getData();
if (data != null)
sb.append("Node: ").append(res.getNode()).append("\n")
- .append(" Result: ").append(data).append("\n");
+ .append(" Result: ").append(data).append("\n\n");
}
- return sb.toString();
+ taskRes.message(sb.toString());
+
+ return taskRes;
}
/**
@@ -107,7 +126,15 @@ public class VisorConsistencyRepairTask extends
VisorMultiNodeTask<VisorConsiste
int p = arg.part();
int batchSize = 1024;
- GridCacheContext<Object, Object> cctx =
ignite.context().cache().cache(cacheName).context();
+ IgniteInternalCache<Object, Object> internalCache =
ignite.context().cache().cache(cacheName);
+
+ if (internalCache == null)
+ if (ignite.context().cache().cacheDescriptor(cacheName) !=
null)
+ return null; // Node filtered by node filter.
+ else
+ throw new IgniteException("Cache not found [name=" +
cacheName + "]");
+
+ GridCacheContext<Object, Object> cctx = internalCache.context();
if (!cctx.gridEvents().isRecordable(EVT_CONSISTENCY_VIOLATION))
throw new UnsupportedOperationException("Consistency violation
events recording is disabled on cluster.");
@@ -119,6 +146,8 @@ public class VisorConsistencyRepairTask extends
VisorMultiNodeTask<VisorConsiste
if (part == null)
return null; // Partition does not belong to the node.
+ long cnt = 0;
+
part.reserve();
try {
@@ -144,13 +173,15 @@ public class VisorConsistencyRepairTask extends
VisorMultiNodeTask<VisorConsiste
try {
cache.getAll(keys); // Repair.
+
+ cnt += keys.size();
}
catch (CacheException e) {
- if (!(e.getCause() instanceof
IgniteConsistencyViolationException))
+ if (!(e.getCause() instanceof
IgniteConsistencyViolationException) && !isCancelled())
throw new IgniteException("Read repair attempt
failed.", e);
}
}
- while (!keys.isEmpty());
+ while (!keys.isEmpty() && !isCancelled());
}
finally {
ignite.events().stopLocalListen(lsnr);
@@ -164,15 +195,15 @@ public class VisorConsistencyRepairTask extends
VisorMultiNodeTask<VisorConsiste
}
if (!evts.isEmpty())
- return processEvents(cctx, p);
+ return processEvents(cctx, p, cnt);
else
- return NOTHING_FOUND;
+ return NOTHING_FOUND + " [processed=" + cnt + "]";
}
/**
*
*/
- private String processEvents(GridCacheContext<Object, Object> cctx,
int part) {
+ private String processEvents(GridCacheContext<Object, Object> cctx,
int part, long cnt) {
int found = 0;
int fixed = 0;
@@ -211,10 +242,10 @@ public class VisorConsistencyRepairTask extends
VisorMultiNodeTask<VisorConsiste
if (!res.isEmpty()) {
log.warning(CONSISTENCY_VIOLATIONS_RECORDED + "\n" + res);
- return CONSISTENCY_VIOLATIONS_FOUND + " [found=" + found + ",
fixed=" + fixed + "]";
+ return CONSISTENCY_VIOLATIONS_FOUND + " [found=" + found + ",
fixed=" + fixed + ", processed=" + cnt + "]";
}
else
- return NOTHING_FOUND;
+ return NOTHING_FOUND + " [processed=" + cnt + "]";
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskResult.java
new file mode 100644
index 0000000..51e6d64
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskResult.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.consistency;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class VisorConsistencyRepairTaskResult extends IgniteDataTransferObject
{
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Result. */
+ private String msg;
+
+ /** Failed. */
+ private boolean failed;
+
+ /** Cancelled. */
+ private boolean cancelled;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected void writeExternalData(ObjectOutput out) throws
IOException {
+ U.writeString(out, msg);
+ out.writeBoolean(failed);
+ out.writeBoolean(cancelled);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected void readExternalData(byte protoVer,
+ ObjectInput in) throws IOException, ClassNotFoundException {
+ msg = U.readString(in);
+ failed = in.readBoolean();
+ cancelled = in.readBoolean();
+ }
+
+ /**
+ * @return Result.
+ */
+ public String message() {
+ return msg;
+ }
+
+ /**
+ * @param res New result.
+ */
+ public void message(String res) {
+ this.msg = res;
+ }
+
+ /**
+ * @return Failed.
+ */
+ public boolean failed() {
+ return failed;
+ }
+
+ /**
+ * @param failed Failed.
+ */
+ public void failed(boolean failed) {
+ this.failed = failed;
+ }
+
+ /**
+ * @return Cancelled.
+ */
+ public boolean cancelled() {
+ return cancelled;
+ }
+
+ /**
+ * @param cancelled New cancelled.
+ */
+ public void cancelled(boolean cancelled) {
+ this.cancelled = cancelled;
+ }
+}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index b787705..419c039 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2010,6 +2010,8 @@
org.apache.ignite.internal.visor.baseline.VisorBaselineTaskArg
org.apache.ignite.internal.visor.baseline.VisorBaselineTaskResult
org.apache.ignite.internal.visor.baseline.VisorBaselineViewTask
org.apache.ignite.internal.visor.baseline.VisorBaselineViewTask$VisorBaselineViewJob
+org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskArg
+org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskResult
org.apache.ignite.internal.visor.misc.VisorIdAndTagViewTaskResult
org.apache.ignite.internal.visor.misc.VisorClusterChangeTagTaskResult
org.apache.ignite.internal.visor.shutdown.VisorShutdownPolicyTask