This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a29e38c8e5 IGNITE-18641 Control.sh: add a command to delete segment 
CDC links before last gap (#10499)
2a29e38c8e5 is described below

commit 2a29e38c8e5b383d61975e54c55c288d5c4be31f
Author: Nikita Amelchev <[email protected]>
AuthorDate: Thu Feb 2 12:09:25 2023 +0300

    IGNITE-18641 Control.sh: add a command to delete segment CDC links before 
last gap (#10499)
---
 docs/_docs/persistence/change-data-capture.adoc    |  31 ++-
 .../internal/commandline/CommandArgIterator.java   |  15 ++
 .../ignite/internal/commandline/CommandList.java   |   6 +-
 .../internal/commandline/CommonArgParser.java      |   2 +-
 .../internal/commandline/cdc/CdcCommand.java       | 140 +++++++++++++
 .../internal/commandline/metric/MetricCommand.java |  15 +-
 .../commandline/CommandHandlerParsingTest.java     |  17 +-
 .../testsuites/IgniteControlUtilityTestSuite2.java |   5 +-
 .../org/apache/ignite/util/CdcCommandTest.java     | 225 +++++++++++++++++++++
 .../util/GridCommandHandlerAbstractTest.java       |  15 ++
 .../util/GridCommandHandlerClusterByClassTest.java |   8 +-
 .../org/apache/ignite/util/MetricCommandTest.java  |  17 +-
 .../apache/ignite/util/SystemViewCommandTest.java  |  15 --
 .../org/apache/ignite/internal/cdc/CdcMain.java    |   6 +-
 .../persistence/wal/FileWriteAheadLogManager.java  |   5 +
 .../visor/cdc/VisorCdcDeleteLostSegmentsTask.java  | 171 ++++++++++++++++
 .../java/org/apache/ignite/cdc/CdcSelfTest.java    |  16 ++
 ...ridCommandHandlerClusterByClassTest_help.output |   7 +
 ...andHandlerClusterByClassWithSSLTest_help.output |   7 +
 19 files changed, 669 insertions(+), 54 deletions(-)

diff --git a/docs/_docs/persistence/change-data-capture.adoc 
b/docs/_docs/persistence/change-data-capture.adoc
index adbfe1e4706..f04fffaee55 100644
--- a/docs/_docs/persistence/change-data-capture.adoc
+++ b/docs/_docs/persistence/change-data-capture.adoc
@@ -142,7 +142,36 @@ IMPORTANT: `ignite-cdc.sh` implements the fail-fast 
approach. It just fails in c
  5. Infinitely wait for the newly available segment and process it.
  6. Stop the consumer in case of a failure or a received stop signal.
 
+== Handling skipped segments
+
+The CDC can be disabled manually or by configured directory maximum size. In 
this case a hard link creation will be skipped.
+
+WARNING: All changes in skipped segments will be lost!
+
+So when enabled there will be gap between segments: `0000000000000002.wal`, 
`0000000000000010.wal`, `0000000000000011.wal`, for example.
+In this case `ignite-cdc.sh` will fail with the something like "Found missed 
segments. Some events are missed. Exiting! [lastSegment=2, nextSegment=10]".
+
+NOTE: Make sure you need to sync data before restarting the CDC application. 
You can synchronize caches using
+snapshot or other methods.
+
+To fix this error you can run the following link:tools/control-script[Control 
Script] command:
+
+[source,shell]
+----
+# Delete lost segment CDC links in the cluster.
+control.sh|bat --cdc delete_lost_segment_links
+
+# Delete lost segment CDC links on a node.
+control.sh|bat --cdc delete_lost_segment_links --node-id node_id
+----
+
+The command will remove all segment links before the last gap.
+
+For example, CDC was turned off several times: `000000000000002.wal`, 
`000000000000003.wal`, `000000000000008.wal`, `0000000000000010.wal`, 
`0000000000000011.wal`
+Then, after the command is executed, the following segment links will be 
deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`.
+The application will start from the `0000000000000010.wal` segment after being 
enabled.
+
 == cdc-ext
 
 Ignite extensions project has 
link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[cdc-ext]
 module which provides two way to setup cross cluster replication based on CDC.
-Detailed documentation can be found on 
link:extensions-and-integrations/change-data-capture-extensions[page].
\ No newline at end of file
+Detailed documentation can be found on 
link:extensions-and-integrations/change-data-capture-extensions[page].
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java
index 0e1e89b5073..892f87e0166 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.NotNull;
 
@@ -149,6 +150,20 @@ public class CommandArgIterator {
         }
     }
 
+    /** @return UUID value. */
+    public UUID nextUuidArg(String argName) {
+        String str = nextArg("Expecting " + argName + " command argument.");
+
+        try {
+            return UUID.fromString(str);
+        }
+        catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Failed to parse " + argName + 
" command argument." +
+                " String representation of \"java.util.UUID\" is exepected. 
For example:" +
+                " 123e4567-e89b-42d3-a456-556642440000", e);
+        }
+    }
+
     /**
      * @param argName Name of argument.
      */
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
index df93519469f..2c7b78ce9bd 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.commandline;
 
 import org.apache.ignite.internal.commandline.cache.CacheCommands;
+import org.apache.ignite.internal.commandline.cdc.CdcCommand;
 import org.apache.ignite.internal.commandline.consistency.ConsistencyCommand;
 import org.apache.ignite.internal.commandline.diagnostic.DiagnosticCommand;
 import org.apache.ignite.internal.commandline.encryption.EncryptionCommands;
@@ -103,7 +104,10 @@ public enum CommandList {
     PERFORMANCE_STATISTICS("--performance-statistics", new 
PerformanceStatisticsCommand()),
 
     /** Command to check/repair consistency. */
-    CONSISTENCY("--consistency", new ConsistencyCommand());
+    CONSISTENCY("--consistency", new ConsistencyCommand()),
+
+    /** Cdc commands. */
+    CDC("--cdc", new CdcCommand());
 
     /** Private values copy so there's no need in cloning it every time. */
     private static final CommandList[] VALUES = CommandList.values();
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java
index 2acdc404052..238a3a78046 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java
@@ -57,7 +57,7 @@ public class CommonArgParser {
     static final String CMD_USER = "--user";
 
     /** Option is used for auto confirmation. */
-    static final String CMD_AUTO_CONFIRMATION = "--yes";
+    public static final String CMD_AUTO_CONFIRMATION = "--yes";
 
     /** */
     static final String CMD_PING_INTERVAL = "--ping-interval";
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
new file mode 100644
index 00000000000..8dbd2b963ab
--- /dev/null
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline.cdc;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.client.GridClientNode;
+import org.apache.ignite.internal.commandline.AbstractCommand;
+import org.apache.ignite.internal.commandline.Command;
+import org.apache.ignite.internal.commandline.CommandArgIterator;
+import org.apache.ignite.internal.commandline.CommandLogger;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask;
+
+import static org.apache.ignite.internal.commandline.CommandList.CDC;
+import static org.apache.ignite.internal.commandline.CommandLogger.optional;
+import static 
org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CONFIRMATION;
+import static 
org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;
+
+/**
+ * CDC command.
+ */
+public class CdcCommand extends AbstractCommand<String> {
+    /** Command to delete lost segment links. */
+    public static final String DELETE_LOST_SEGMENT_LINKS = 
"delete_lost_segment_links";
+
+    /** */
+    public static final String NODE_ID = "--node-id";
+
+    /** Node ID. */
+    private UUID nodeId;
+
+    /** {@inheritDoc} */
+    @Override public Object execute(GridClientConfiguration clientCfg, 
IgniteLogger log) throws Exception {
+        try (GridClient client = Command.startClient(clientCfg)) {
+            executeTaskByNameOnNode(
+                client,
+                VisorCdcDeleteLostSegmentsTask.class.getName(),
+                null,
+                nodeId,
+                clientCfg
+            );
+
+            Collection<UUID> nodeIds = nodeId != null ? 
Collections.singletonList(nodeId) :
+                client.compute().nodes(node -> 
!node.isClient()).stream().map(GridClientNode::nodeId)
+                    .collect(Collectors.toSet());
+
+            
client.compute().execute(VisorCdcDeleteLostSegmentsTask.class.getName(),
+                new VisorTaskArgument<>(nodeIds, false));
+
+            String res = "Lost segment CDC links successfully removed.";
+
+            log.info(res);
+
+            return res;
+        }
+        catch (Throwable e) {
+            log.error("Failed to perform operation.");
+            log.error(CommandLogger.errorMessage(e));
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void parseArguments(CommandArgIterator argIter) {
+        nodeId = null;
+
+        String cmd = argIter.nextArg("Expected command: " + 
DELETE_LOST_SEGMENT_LINKS);
+
+        if (!DELETE_LOST_SEGMENT_LINKS.equalsIgnoreCase(cmd))
+            throw new IllegalArgumentException("Unexpected command: " + cmd);
+
+        while (argIter.hasNextSubArg()) {
+            String opt = argIter.nextArg("Failed to read command argument.");
+
+            if (NODE_ID.equalsIgnoreCase(opt))
+                nodeId = argIter.nextUuidArg(NODE_ID);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String confirmationPrompt() {
+        return "Warning: The command will fix WAL segments gap in case CDC 
link creation was stopped by distributed " +
+            "property or excess of maximum CDC directory size. Gap will be 
fixed by deletion of WAL segment links" +
+            "previous to the last gap." + U.nl() +
+            "All changes in deleted segment links will be lost!" + U.nl() +
+            "Make sure you need to sync data before restarting the CDC 
application. You can synchronize caches " +
+            "using snapshot or other methods.";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String arg() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printUsage(IgniteLogger logger) {
+        Map<String, String> params = new LinkedHashMap<>();
+
+        params.put("node_id", "ID of the node to delete lost segment links 
from. If not set, the command will affect " +
+            "all server nodes.");
+
+        usage(logger, "Delete lost segment CDC links:", CDC, params, 
DELETE_LOST_SEGMENT_LINKS,
+            optional(NODE_ID, "node_id"), optional(CMD_AUTO_CONFIRMATION));
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "cdc";
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean experimental() {
+        return true;
+    }
+}
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java
index 3e8d22bf2ed..6dd8c600387 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java
@@ -106,19 +106,8 @@ public class MetricCommand extends 
AbstractCommand<VisorMetricTaskArg> {
 
             MetricCommandArg cmdArg = CommandArgUtils.of(arg, 
MetricCommandArg.class);
 
-            if (cmdArg == NODE_ID) {
-                String nodeIdArg = argIter.nextArg(
-                    "ID of the node from which metric values should be 
obtained is expected.");
-
-                try {
-                    nodeId = UUID.fromString(nodeIdArg);
-                }
-                catch (IllegalArgumentException e) {
-                    throw new IllegalArgumentException("Failed to parse " + 
NODE_ID + " command argument." +
-                        " String representation of \"java.util.UUID\" is 
exepected. For example:" +
-                        " 123e4567-e89b-42d3-a456-556642440000", e);
-                }
-            }
+            if (cmdArg == NODE_ID)
+                nodeId = argIter.nextUuidArg(NODE_ID.argName());
             else if (cmdArg == CONFIGURE_HISTOGRAM || cmdArg == 
CONFIGURE_HITRATE) {
                 if (metricName != null) {
                     throw new IllegalArgumentException(
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
index fa478da5fd2..c2ee56ce044 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
@@ -64,6 +64,7 @@ import static java.util.Collections.singletonList;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTAL_COMMAND;
 import static 
org.apache.ignite.internal.QueryMXBeanImpl.EXPECTED_GLOBAL_QRY_ID_FORMAT;
 import static org.apache.ignite.internal.commandline.CommandList.CACHE;
+import static org.apache.ignite.internal.commandline.CommandList.CDC;
 import static 
org.apache.ignite.internal.commandline.CommandList.CLUSTER_CHANGE_TAG;
 import static org.apache.ignite.internal.commandline.CommandList.SET_STATE;
 import static 
org.apache.ignite.internal.commandline.CommandList.SHUTDOWN_POLICY;
@@ -78,6 +79,8 @@ import static 
org.apache.ignite.internal.commandline.cache.CacheSubcommands.FIND
 import static 
org.apache.ignite.internal.commandline.cache.CacheSubcommands.VALIDATE_INDEXES;
 import static 
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_FIRST;
 import static 
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_THROUGH;
+import static 
org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS;
+import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -412,6 +415,8 @@ public class CommandHandlerParsingTest {
                 args = parseArgs(asList(cmdL.text(), "newTagValue"));
             else if (cmdL == WARM_UP)
                 args = parseArgs(asList(cmdL.text(), "--stop"));
+            else if (cmdL == CDC)
+                args = parseArgs(asList(cmdL.text(), 
DELETE_LOST_SEGMENT_LINKS, NODE_ID, UUID.randomUUID().toString()));
             else
                 args = parseArgs(asList(cmdL.text()));
 
@@ -497,6 +502,15 @@ public class CommandHandlerParsingTest {
                     break;
                 }
 
+                case CDC: {
+                    args = parseArgs(asList(cmdL.text(), 
DELETE_LOST_SEGMENT_LINKS,
+                        NODE_ID, UUID.randomUUID().toString(), "--yes"));
+
+                    checkCommonParametersCorrectlyParsed(cmdL, args, true);
+
+                    break;
+                }
+
                 default:
                     fail("Unknown command: " + cmd);
             }
@@ -1232,6 +1246,7 @@ public class CommandHandlerParsingTest {
             cmd == CommandList.METRIC ||
             cmd == CommandList.DEFRAGMENTATION ||
             cmd == CommandList.PERFORMANCE_STATISTICS ||
-            cmd == CommandList.CONSISTENCY;
+            cmd == CommandList.CONSISTENCY ||
+            cmd == CDC;
     }
 }
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
index 3b7ac34cb5e..ce8571c4ef4 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import 
org.apache.ignite.internal.commandline.indexreader.IgniteIndexReaderTest;
 import org.apache.ignite.util.CacheMetricsCommandTest;
+import org.apache.ignite.util.CdcCommandTest;
 import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest;
 import org.apache.ignite.util.GridCommandHandlerConsistencyCountersTest;
 import 
org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessAtomicTest;
@@ -64,7 +65,9 @@ import org.junit.runners.Suite;
     PerformanceStatisticsCommandTest.class,
     CacheMetricsCommandTest.class,
 
-    IgniteIndexReaderTest.class
+    IgniteIndexReaderTest.class,
+
+    CdcCommandTest.class
 })
 public class IgniteControlUtilityTestSuite2 {
 }
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
new file mode 100644
index 00000000000..d920e08ab29
--- /dev/null
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.commandline.CommandList;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT;
+import static org.apache.ignite.cdc.CdcSelfTest.addData;
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
+import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
+import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
+import static 
org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS;
+import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+
+/**
+ * CDC command tests.
+ */
+public class CdcCommandTest extends GridCommandHandlerAbstractTest {
+    /** */
+    private IgniteEx srv0;
+
+    /** */
+    private IgniteEx srv1;
+
+    /** */
+    private DistributedChangeableProperty<Serializable> cdcDisabled;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setBackups(1));
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setWalForceArchiveTimeout(1000)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setCdcEnabled(true)));
+
+        cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_ARCHIVED);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+
+        srv0 = startGrid(0);
+        srv1 = startGrid(1);
+
+        cdcDisabled = 
srv0.context().distributedConfiguration().property(FileWriteAheadLogManager.CDC_DISABLED);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testParseDeleteLostSegmentLinks() {
+        injectTestSystemOut();
+
+        assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
+                CommandList.CDC.text(), "unexpected_command"),
+            "Unexpected command: unexpected_command");
+
+        assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
+                CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID),
+            "Failed to parse " + NODE_ID + " command argument.");
+
+        assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
+                CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, 
"10"),
+            "Failed to parse " + NODE_ID + " command argument.");
+    }
+
+    /** */
+    @Test
+    public void testDeleteLostSegmentLinksApplicationNotClosed() throws 
Exception {
+        injectTestSystemOut();
+
+        CountDownLatch appStarted = new CountDownLatch(1);
+
+        CdcConfiguration cfg = new CdcConfiguration();
+
+        cfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() {
+            @Override public void start(MetricRegistry mreg) {
+                appStarted.countDown();
+            }
+        });
+
+        CdcMain cdc = new 
CdcMain(getConfiguration(getTestIgniteInstanceName(0)), null, cfg);
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(cdc);
+
+        appStarted.await(getTestTimeout(), TimeUnit.MILLISECONDS);
+
+        assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+                CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, 
srv0.localNode().id().toString()),
+            "Failed to delete lost segment CDC links. Unable to acquire lock 
to lock CDC folder.");
+
+        assertFalse(fut.isDone());
+
+        fut.cancel();
+    }
+
+    /** */
+    @Test
+    public void testDeleteLostSegmentLinks() throws Exception {
+        checkDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), true);
+    }
+
+    /** */
+    @Test
+    public void testDeleteLostSegmentLinksOneNode() throws Exception {
+        checkDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), false);
+    }
+
+    /** */
+    @Test
+    public void testDeleteLostSegmentLinksMultipleGaps() throws Exception {
+        checkDeleteLostSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L), true);
+    }
+
+    /** */
+    private void checkDeleteLostSegmentLinks(List<Long> expBefore, List<Long> 
expAfter, boolean allNodes) throws Exception {
+        archiveSegmentLinks(expBefore);
+
+        checkLinks(srv0, expBefore);
+        checkLinks(srv1, expBefore);
+
+        String[] args = allNodes ? new String[] {CommandList.CDC.text(), 
DELETE_LOST_SEGMENT_LINKS} :
+            new String[] {CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, 
NODE_ID, srv0.localNode().id().toString()};
+
+        executeCommand(EXIT_CODE_OK, args);
+
+        checkLinks(srv0, expAfter);
+        checkLinks(srv1, allNodes ? expAfter : expBefore);
+    }
+
+    /** */
+    private void checkLinks(IgniteEx srv, List<Long> expLinks) {
+        FileWriteAheadLogManager wal0 = 
(FileWriteAheadLogManager)srv.context().cache().context().wal(true);
+
+        File[] links = 
wal0.walCdcDirectory().listFiles(WAL_SEGMENT_FILE_FILTER);
+
+        assertEquals(expLinks.size(), links.length);
+        Arrays.stream(links).map(File::toPath).map(CdcMain::segmentIndex)
+            .allMatch(expLinks::contains);
+    }
+
+    /** Archive given segments links with possible gaps. */
+    private void archiveSegmentLinks(List<Long> idxs) throws Exception {
+        for (long idx = 0; idx <= idxs.stream().mapToLong(v -> 
v).max().getAsLong(); idx++) {
+            cdcDisabled.propagate(!idxs.contains(idx));
+
+            archiveSegment();
+        }
+    }
+
+    /** */
+    private void archiveSegment() throws Exception {
+        CountDownLatch latch = new CountDownLatch(G.allGrids().size());
+
+        for (Ignite srv : G.allGrids()) {
+            srv.events().localListen(evt -> {
+                latch.countDown();
+
+                return false;
+            }, EVT_WAL_SEGMENT_ARCHIVED);
+        }
+
+        addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+        latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
+    }
+}
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
index 3dc6ed6e214..e3555a2e310 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
@@ -314,6 +314,21 @@ public abstract class GridCommandHandlerAbstractTest 
extends GridCommonAbstractT
         return execute(new ArrayList<>(asList(args)));
     }
 
+    /**
+     * Executes command and checks its exit code.
+     *
+     * @param expExitCode Expected exit code.
+     * @param args Command lines arguments.
+     * @return Result of command execution.
+     */
+    protected String executeCommand(int expExitCode, String... args) {
+        int res = execute(args);
+
+        assertEquals(expExitCode, res);
+
+        return testOut.toString();
+    }
+
     /**
      * Before command executed {@link #testOut} reset.
      *
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
index d40f05bfcbb..8bf760eed51 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
@@ -24,7 +24,6 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashSet;
 import java.util.List;
@@ -92,6 +91,7 @@ import org.junit.Test;
 
 import static java.util.Arrays.asList;
 import static java.util.Arrays.stream;
+import static java.util.Collections.singletonList;
 import static java.util.Objects.nonNull;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTAL_COMMAND;
@@ -104,6 +104,7 @@ import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK
 import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
 import static 
org.apache.ignite.internal.commandline.CommandHandler.UTILITY_NAME;
 import static org.apache.ignite.internal.commandline.CommandList.BASELINE;
+import static org.apache.ignite.internal.commandline.CommandList.CDC;
 import static org.apache.ignite.internal.commandline.CommandList.CONSISTENCY;
 import static org.apache.ignite.internal.commandline.CommandList.METADATA;
 import static 
org.apache.ignite.internal.commandline.CommandList.TRACING_CONFIGURATION;
@@ -115,6 +116,8 @@ import static 
org.apache.ignite.internal.commandline.cache.CacheDestroy.CACHE_NA
 import static 
org.apache.ignite.internal.commandline.cache.CacheDestroy.DESTROY_ALL_ARG;
 import static 
org.apache.ignite.internal.commandline.cache.CacheSubcommands.DESTROY;
 import static 
org.apache.ignite.internal.commandline.cache.CacheSubcommands.HELP;
+import static 
org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS;
+import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID;
 import static 
org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.CACHE;
 import static 
org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.PARTITIONS;
 import static 
org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.STRATEGY;
@@ -1729,11 +1732,12 @@ public class GridCommandHandlerClusterByClassTest 
extends GridCommandHandlerClus
 
         cmdArgs.put(WAL, asList(new String[] {"print"}, new String[] 
{"delete"}));
         cmdArgs.put(METADATA, asList(new String[] {"help"}, new String[] 
{"list"}));
-        cmdArgs.put(TRACING_CONFIGURATION, Collections.singletonList(new 
String[] {"get_all"}));
+        cmdArgs.put(TRACING_CONFIGURATION, singletonList(new String[] 
{"get_all"}));
         cmdArgs.put(CONSISTENCY, asList(
             new String[] {"repair", CACHE, "cache", PARTITIONS, "0", STRATEGY, 
"LWW"},
             new String[] {"status"},
             new String[] {"finalize"}));
+        cmdArgs.put(CDC, singletonList(new String[] 
{DELETE_LOST_SEGMENT_LINKS, NODE_ID, UUID.randomUUID().toString()}));
 
         String warning = String.format(
             "To use experimental command add --enable-experimental parameter 
for %s",
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java
index b173fad8ddb..ae5b01e07ee 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java
@@ -75,7 +75,7 @@ public class MetricCommandTest extends 
GridCommandHandlerClusterByClassAbstractT
     @Test
     public void testNodeIdMissedFailure() {
         assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, 
CMD_METRIC, SYS_METRICS, NODE_ID.argName()),
-            "ID of the node from which metric values should be obtained is 
expected.");
+            "Expecting " + NODE_ID.argName() + " command argument.");
     }
 
     /** Tests command error output in case value of {@link 
MetricCommandArg#NODE_ID} argument is invalid.*/
@@ -465,19 +465,4 @@ public class MetricCommandTest extends 
GridCommandHandlerClusterByClassAbstractT
         
         return res;
     }
-
-    /**
-     * Executes command and checks its exit code.
-     *
-     * @param expExitCode Expected exit code.
-     * @param args Command lines arguments.
-     * @return Result of command execution.
-     */
-    private String executeCommand(int expExitCode, String... args) {
-        int res = execute(args);
-
-        assertEquals(expExitCode, res);
-
-        return testOut.toString();
-    }
 }
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java
index 67ce487de1e..bf788fc6b27 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java
@@ -1303,19 +1303,4 @@ public class SystemViewCommandTest extends 
GridCommandHandlerClusterByClassAbstr
 
         return res;
     }
-
-    /**
-     * Executes command and checks its exit code.
-     *
-     * @param expExitCode Expected exit code.
-     * @param args Command lines arguments.
-     * @return Result of command execution.
-     */
-    private String executeCommand(int expExitCode, String... args) {
-        int res = execute(args);
-
-        assertEquals(expExitCode, res);
-
-        return testOut.toString();
-    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 35d5a82023c..b8a1bd76790 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -438,12 +438,12 @@ public class CdcMain implements Runnable {
                         // Need unseen WAL segments only.
                         .filter(p -> 
WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
                         .peek(seen::add) // Adds to seen.
-                        .sorted(Comparator.comparingLong(this::segmentIndex)) 
// Sort by segment index.
+                        
.sorted(Comparator.comparingLong(CdcMain::segmentIndex)) // Sort by segment 
index.
                         .peek(p -> {
                             long nextSgmnt = segmentIndex(p);
 
                             if (lastSgmnt.get() != -1 && nextSgmnt - 
lastSgmnt.get() != 1) {
-                                throw new IgniteException("Found missed 
segments. Some events are missed. " +
+                                throw new IgniteException("Found missed 
segments. Some events are missed. Exiting! " +
                                     "[lastSegment=" + lastSgmnt.get() + ", 
nextSegment=" + nextSgmnt + ']');
                             }
 
@@ -811,7 +811,7 @@ public class CdcMain implements Runnable {
      * @param segment WAL segment file.
      * @return Segment index.
      */
-    public long segmentIndex(Path segment) {
+    public static long segmentIndex(Path segment) {
         String fn = segment.getFileName().toString();
 
         return Long.parseLong(fn.substring(0, fn.indexOf('.')));
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 619a84ef0fe..1c07f5e279c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -3301,6 +3301,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         return len;
     }
 
+    /** @return WAL cdc directory (including consistent ID as subfolder) */
+    @Nullable public File walCdcDirectory() {
+        return walCdcDir;
+    }
+
     /**
      * Check if WAL archive is unlimited.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java
new file mode 100644
index 00000000000..b63e18d757a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.cdc.CdcFileLockHolder;
+import org.apache.ignite.internal.cdc.CdcMain;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.cdc.CdcConsumerState.WAL_STATE_FILE_NAME;
+import static org.apache.ignite.internal.cdc.CdcMain.STATE_DIR;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+
+/**
+ * Task to delete lost segment CDC links.
+ */
+@GridInternal
+public class VisorCdcDeleteLostSegmentsTask extends VisorMultiNodeTask<Void, 
Void, Void> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorJob<Void, Void> job(Void arg) {
+        return new VisorCdcDeleteLostSegmentsJob(arg, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable Void reduce0(List<ComputeJobResult> results) 
throws IgniteException {
+        for (ComputeJobResult res : results) {
+            if (res.getException() != null) {
+                throw new IgniteException("Failed to delete lost segment CDC 
links on a node " +
+                    "[nodeId=" + res.getNode().id() + ']', res.getException());
+            }
+        }
+
+        return null;
+    }
+
+    /** */
+    private static class VisorCdcDeleteLostSegmentsJob extends VisorJob<Void, 
Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Injected logger. */
+        @LoggerResource
+        protected IgniteLogger log;
+
+        /**
+         * Create job with specified argument.
+         *
+         * @param arg   Job argument.
+         * @param debug Flag indicating whether debug information should be 
printed into node log.
+         */
+        protected VisorCdcDeleteLostSegmentsJob(Void arg, boolean debug) {
+            super(arg, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Void run(Void arg) throws IgniteException {
+            FileWriteAheadLogManager wal = 
(FileWriteAheadLogManager)ignite.context().cache().context().wal(true);
+
+            File walCdcDir = wal.walCdcDirectory();
+
+            if (walCdcDir == null)
+                throw new IgniteException("CDC is not configured.");
+
+            CdcFileLockHolder lock = new 
CdcFileLockHolder(walCdcDir.getAbsolutePath(), "Delete lost segments job", log);
+
+            try {
+                lock.tryLock(1);
+
+                try (Stream<Path> cdcFiles = Files.list(walCdcDir.toPath())) {
+                    Set<File> delete = new HashSet<>();
+
+                    AtomicLong lastSgmnt = new AtomicLong(-1);
+
+                    cdcFiles
+                        .filter(p -> 
WAL_SEGMENT_FILE_FILTER.accept(p.toFile()))
+                        .sorted(Comparator.comparingLong(CdcMain::segmentIndex)
+                            .reversed()) // Sort by segment index.
+                        .forEach(path -> {
+                            long idx = CdcMain.segmentIndex(path);
+
+                            if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx 
== 1) {
+                                lastSgmnt.set(idx);
+
+                                return;
+                            }
+
+                            delete.add(path.toFile());
+                        });
+
+                    if (delete.isEmpty()) {
+                        log.info("Lost segment CDC links were not found.");
+
+                        return null;
+                    }
+
+                    log.info("Found lost segment CDC links. The following 
links will be deleted: " + delete);
+
+                    delete.forEach(file -> {
+                        if (!file.delete()) {
+                            throw new IgniteException("Failed to delete lost 
segment CDC link [file=" +
+                                file.getAbsolutePath() + ']');
+                        }
+
+                        log.info("Segment CDC link deleted [file=" + 
file.getAbsolutePath() + ']');
+                    });
+
+                    Path stateDir = walCdcDir.toPath().resolve(STATE_DIR);
+
+                    if (stateDir.toFile().exists()) {
+                        File walState = 
stateDir.resolve(WAL_STATE_FILE_NAME).toFile();
+
+                        if (walState.exists() && !walState.delete()) {
+                            throw new IgniteException("Failed to delete wal 
state file [file=" +
+                                walState.getAbsolutePath() + ']');
+                        }
+                    }
+                }
+                catch (IOException e) {
+                    throw new RuntimeException("Failed to delete lost segment 
CDC links.", e);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                throw new RuntimeException("Failed to delete lost segment CDC 
links. " +
+                    "Unable to acquire lock to lock CDC folder. Make sure a 
CDC app is shut down " +
+                    "[dir=" + walCdcDir.getAbsolutePath() + ", reason=" + 
e.getMessage() + ']');
+            }
+            finally {
+                U.closeQuiet(lock);
+            }
+
+            return null;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index a05d89fccc9..3d456f2e729 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -62,6 +62,8 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.RunnableX;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -791,6 +793,20 @@ public class CdcSelfTest extends AbstractCdcTest {
 
         assertThrows(log, () -> fut.get(getTestTimeout()), 
IgniteCheckedException.class,
             "Found missed segments. Some events are missed.");
+
+        ign.compute().execute(VisorCdcDeleteLostSegmentsTask.class, new 
VisorTaskArgument<>(ign.localNode().id(), false));
+
+        cnsmr.data.clear();
+
+        cdc = createCdc(cnsmr, getConfiguration(ign.name()));
+
+        IgniteInternalFuture<?> f = runAsync(cdc);
+
+        waitForSize(1, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+
+        assertFalse(f.isDone());
+
+        f.cancel();
     }
 
     /** */
diff --git 
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
 
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
index 08f1081d34c..0f27ca2680b 100644
--- 
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
+++ 
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
@@ -343,6 +343,13 @@ If the file name isn't specified the output file name is: 
'<typeId>.bin'
   Finalize partitions update counters:
     control.(sh|bat) --consistency finalize
 
+  [EXPERIMENTAL]
+  Delete lost segment CDC links:
+    control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] 
[--yes]
+
+    Parameters:
+      node_id  - ID of the node to delete lost segment links from. If not set, 
the command will affect all server nodes.
+
 By default commands affecting the cluster require interactive confirmation.
 Use --yes option to disable it.
 
diff --git 
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
 
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
index 08f1081d34c..0f27ca2680b 100644
--- 
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
+++ 
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
@@ -343,6 +343,13 @@ If the file name isn't specified the output file name is: 
'<typeId>.bin'
   Finalize partitions update counters:
     control.(sh|bat) --consistency finalize
 
+  [EXPERIMENTAL]
+  Delete lost segment CDC links:
+    control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] 
[--yes]
+
+    Parameters:
+      node_id  - ID of the node to delete lost segment links from. If not set, 
the command will affect all server nodes.
+
 By default commands affecting the cluster require interactive confirmation.
 Use --yes option to disable it.
 

Reply via email to