This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c79a62f2d60 IGNITE-18425 Added CDC command to forcefully resend cache
data (#10524)
c79a62f2d60 is described below
commit c79a62f2d60667113dd77ba21b6d41d91f0e1864
Author: Nikita Amelchev <[email protected]>
AuthorDate: Fri Apr 7 10:51:05 2023 +0300
IGNITE-18425 Added CDC command to forcefully resend cache data (#10524)
---
docs/_docs/persistence/change-data-capture.adoc | 22 +-
.../internal/commandline/cdc/CdcCommand.java | 97 +------
.../internal/commandline/cdc/CdcSubcommands.java | 66 +++++
...and.java => DeleteLostSegmentLinksCommand.java} | 23 +-
.../internal/commandline/cdc/ResendCommand.java | 115 ++++++++
.../commandline/CommandHandlerParsingTest.java | 4 +-
.../testsuites/IgniteControlUtilityTestSuite2.java | 4 +-
.../org/apache/ignite/util/CdcCommandTest.java | 321 ++++++++++++++++++++-
.../apache/ignite/util/CdcResendCommandTest.java | 97 +++++++
.../util/GridCommandHandlerClusterByClassTest.java | 4 +-
.../org/apache/ignite/internal/cdc/CdcMain.java | 3 +-
.../internal/pagemem/wal/record/CdcDataRecord.java | 40 +++
.../internal/pagemem/wal/record/WALRecord.java | 5 +-
.../persistence/wal/FileWriteAheadLogManager.java | 3 +-
.../wal/reader/StandaloneWalRecordsIterator.java | 1 +
.../wal/serializer/RecordDataV1Serializer.java | 3 +-
.../wal/serializer/RecordDataV2Serializer.java | 3 +
.../visor/cdc/VisorCdcCacheDataResendTask.java | 248 ++++++++++++++++
.../visor/cdc/VisorCdcCacheDataResendTaskArg.java | 59 ++++
.../org/apache/ignite/cdc/AbstractCdcTest.java | 10 +-
.../testframework/wal/record/RecordUtils.java | 2 +
...ridCommandHandlerClusterByClassTest_help.output | 9 +-
...andHandlerClusterByClassWithSSLTest_help.output | 9 +-
23 files changed, 1029 insertions(+), 119 deletions(-)
diff --git a/docs/_docs/persistence/change-data-capture.adoc
b/docs/_docs/persistence/change-data-capture.adoc
index f04fffaee55..846e389cb5a 100644
--- a/docs/_docs/persistence/change-data-capture.adoc
+++ b/docs/_docs/persistence/change-data-capture.adoc
@@ -152,7 +152,7 @@ So when enabled there will be gap between segments:
`0000000000000002.wal`, `000
In this case `ignite-cdc.sh` will fail with the something like "Found missed
segments. Some events are missed. Exiting! [lastSegment=2, nextSegment=10]".
NOTE: Make sure you need to sync data before restarting the CDC application.
You can synchronize caches using
-snapshot or other methods.
+link:#forcefully-resend-all-cache-data-to-cdc[resend command], snapshot or
other methods.
To fix this error you can run the following link:tools/control-script[Control
Script] command:
@@ -171,6 +171,26 @@ For example, CDC was turned off several times:
`000000000000002.wal`, `000000000
Then, after the command is executed, the following segment links will be
deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`.
The application will start from the `0000000000000010.wal` segment after being
enabled.
+== Forcefully resend all cache data to CDC
+
+When the CDC has been forcefully disabled for a while, cache changes will be
skipped.
+In this case it is necessary to resend data from existing caches.
+For example, this is important if you need to ensure consistency of cache data
before a replication restart.
+
+NOTE: The command will be canceled if cluster was not rebalanced or topology
changed (node left/joined, baseline changed).
+
+To forcefully resend all cache data to CDC you can run the following
link:tools/control-script[Control Script] command:
+
+[source,shell]
+----
+# Forcefully resend all cache data to CDC. Iterates over caches and writes
primary copies of data entries to the WAL to get captured by CDC:
+control.sh|bat --cdc resend --caches cache1,...,cacheN
+----
+
+The command will iterate over caches and writes primary copies of data entries
to the WAL to get captured by the CDC application.
+
+NOTE: There are no guarantees of notifying the CDC consumer on concurrent
cache updates: use the `CdcEvent#version` to resolve version.
+
== cdc-ext
Ignite extensions project has
link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[cdc-ext]
module which provides two way to setup cross cluster replication based on CDC.
diff --git
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
index 8dbd2b963ab..9d7352d1a7b 100644
---
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
+++
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
@@ -17,115 +17,44 @@
package org.apache.ignite.internal.commandline.cdc;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientConfiguration;
-import org.apache.ignite.internal.client.GridClientNode;
import org.apache.ignite.internal.commandline.AbstractCommand;
-import org.apache.ignite.internal.commandline.Command;
import org.apache.ignite.internal.commandline.CommandArgIterator;
-import org.apache.ignite.internal.commandline.CommandLogger;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.visor.VisorTaskArgument;
-import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask;
-
-import static org.apache.ignite.internal.commandline.CommandList.CDC;
-import static org.apache.ignite.internal.commandline.CommandLogger.optional;
-import static
org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CONFIRMATION;
-import static
org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;
/**
* CDC command.
*/
-public class CdcCommand extends AbstractCommand<String> {
- /** Command to delete lost segment links. */
- public static final String DELETE_LOST_SEGMENT_LINKS =
"delete_lost_segment_links";
-
- /** */
- public static final String NODE_ID = "--node-id";
-
- /** Node ID. */
- private UUID nodeId;
+public class CdcCommand extends AbstractCommand<Object> {
+ /** Cdc sub-command to execute. */
+ private AbstractCommand<?> cmd;
/** {@inheritDoc} */
@Override public Object execute(GridClientConfiguration clientCfg,
IgniteLogger log) throws Exception {
- try (GridClient client = Command.startClient(clientCfg)) {
- executeTaskByNameOnNode(
- client,
- VisorCdcDeleteLostSegmentsTask.class.getName(),
- null,
- nodeId,
- clientCfg
- );
-
- Collection<UUID> nodeIds = nodeId != null ?
Collections.singletonList(nodeId) :
- client.compute().nodes(node ->
!node.isClient()).stream().map(GridClientNode::nodeId)
- .collect(Collectors.toSet());
-
-
client.compute().execute(VisorCdcDeleteLostSegmentsTask.class.getName(),
- new VisorTaskArgument<>(nodeIds, false));
-
- String res = "Lost segment CDC links successfully removed.";
-
- log.info(res);
-
- return res;
- }
- catch (Throwable e) {
- log.error("Failed to perform operation.");
- log.error(CommandLogger.errorMessage(e));
-
- throw e;
- }
+ return cmd.execute(clientCfg, log);
}
/** {@inheritDoc} */
@Override public void parseArguments(CommandArgIterator argIter) {
- nodeId = null;
-
- String cmd = argIter.nextArg("Expected command: " +
DELETE_LOST_SEGMENT_LINKS);
-
- if (!DELETE_LOST_SEGMENT_LINKS.equalsIgnoreCase(cmd))
- throw new IllegalArgumentException("Unexpected command: " + cmd);
-
- while (argIter.hasNextSubArg()) {
- String opt = argIter.nextArg("Failed to read command argument.");
+ cmd = CdcSubcommands.of(argIter.nextArg("Expected CDC
sub-command.")).subCommand();
- if (NODE_ID.equalsIgnoreCase(opt))
- nodeId = argIter.nextUuidArg(NODE_ID);
- }
+ cmd.parseArguments(argIter);
}
/** {@inheritDoc} */
- @Override public String confirmationPrompt() {
- return "Warning: The command will fix WAL segments gap in case CDC
link creation was stopped by distributed " +
- "property or excess of maximum CDC directory size. Gap will be
fixed by deletion of WAL segment links" +
- "previous to the last gap." + U.nl() +
- "All changes in deleted segment links will be lost!" + U.nl() +
- "Make sure you need to sync data before restarting the CDC
application. You can synchronize caches " +
- "using snapshot or other methods.";
+ @Override public Object arg() {
+ return cmd.arg();
}
/** {@inheritDoc} */
- @Override public String arg() {
- return null;
+ @Override public String confirmationPrompt() {
+ return cmd == null ? null : cmd.confirmationPrompt();
}
/** {@inheritDoc} */
- @Override public void printUsage(IgniteLogger logger) {
- Map<String, String> params = new LinkedHashMap<>();
-
- params.put("node_id", "ID of the node to delete lost segment links
from. If not set, the command will affect " +
- "all server nodes.");
-
- usage(logger, "Delete lost segment CDC links:", CDC, params,
DELETE_LOST_SEGMENT_LINKS,
- optional(NODE_ID, "node_id"), optional(CMD_AUTO_CONFIRMATION));
+ @Override public void printUsage(IgniteLogger log) {
+ for (CdcSubcommands cmd : CdcSubcommands.values())
+ cmd.subCommand().printUsage(log);
}
/** {@inheritDoc} */
diff --git
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcSubcommands.java
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcSubcommands.java
new file mode 100644
index 00000000000..a82d4ec6768
--- /dev/null
+++
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcSubcommands.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline.cdc;
+
+import org.apache.ignite.internal.commandline.AbstractCommand;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * CDC sub commands.
+ */
+public enum CdcSubcommands {
+ /** Sub-command to delete lost segment links. */
+ DELETE_LOST_SEGMENT_LINKS(new DeleteLostSegmentLinksCommand()),
+
+ /** Sub-command to forcefully resend cache data. */
+ RESEND(new ResendCommand());
+
+ /** Sub-command. */
+ private final AbstractCommand<?> cmd;
+
+ /** @param cmd Sub-command. */
+ CdcSubcommands(AbstractCommand<?> cmd) {
+ this.cmd = cmd;
+ }
+
+ /**
+ * @param name Command name (case insensitive).
+ * @return Command for the specified name.
+ */
+ public static CdcSubcommands of(String name) {
+ CdcSubcommands[] cmds = values();
+
+ for (CdcSubcommands cmd : cmds) {
+ if (cmd.subCommand().name().equalsIgnoreCase(name))
+ return cmd;
+ }
+
+ throw new IllegalArgumentException(
+ "Invalid argument: " + name + ". One of " + F.asList(cmds) + " is
expected.");
+ }
+
+ /** @return Sub-command. */
+ public AbstractCommand<?> subCommand() {
+ return cmd;
+ }
+
+ /** @return Sub-command name. */
+ @Override public String toString() {
+ return cmd.name();
+ }
+}
diff --git
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/DeleteLostSegmentLinksCommand.java
similarity index 91%
copy from
modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
copy to
modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/DeleteLostSegmentLinksCommand.java
index 8dbd2b963ab..12bca8c9711 100644
---
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
+++
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/DeleteLostSegmentLinksCommand.java
@@ -41,9 +41,9 @@ import static
org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CO
import static
org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;
/**
- * CDC command.
+ * Command to delete lost segment links.
*/
-public class CdcCommand extends AbstractCommand<String> {
+public class DeleteLostSegmentLinksCommand extends AbstractCommand<Object> {
/** Command to delete lost segment links. */
public static final String DELETE_LOST_SEGMENT_LINKS =
"delete_lost_segment_links";
@@ -89,11 +89,6 @@ public class CdcCommand extends AbstractCommand<String> {
@Override public void parseArguments(CommandArgIterator argIter) {
nodeId = null;
- String cmd = argIter.nextArg("Expected command: " +
DELETE_LOST_SEGMENT_LINKS);
-
- if (!DELETE_LOST_SEGMENT_LINKS.equalsIgnoreCase(cmd))
- throw new IllegalArgumentException("Unexpected command: " + cmd);
-
while (argIter.hasNextSubArg()) {
String opt = argIter.nextArg("Failed to read command argument.");
@@ -112,25 +107,25 @@ public class CdcCommand extends AbstractCommand<String> {
"using snapshot or other methods.";
}
- /** {@inheritDoc} */
- @Override public String arg() {
- return null;
- }
-
/** {@inheritDoc} */
@Override public void printUsage(IgniteLogger logger) {
Map<String, String> params = new LinkedHashMap<>();
- params.put("node_id", "ID of the node to delete lost segment links
from. If not set, the command will affect " +
+ params.put(NODE_ID + " node_id", "ID of the node to delete lost
segment links from. If not set, the command will affect " +
"all server nodes.");
usage(logger, "Delete lost segment CDC links:", CDC, params,
DELETE_LOST_SEGMENT_LINKS,
optional(NODE_ID, "node_id"), optional(CMD_AUTO_CONFIRMATION));
}
+ /** {@inheritDoc} */
+ @Override public Object arg() {
+ return null;
+ }
+
/** {@inheritDoc} */
@Override public String name() {
- return "cdc";
+ return DELETE_LOST_SEGMENT_LINKS;
}
/** {@inheritDoc} */
diff --git
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/ResendCommand.java
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/ResendCommand.java
new file mode 100644
index 00000000000..1ba381f347a
--- /dev/null
+++
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/ResendCommand.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline.cdc;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.commandline.AbstractCommand;
+import org.apache.ignite.internal.commandline.Command;
+import org.apache.ignite.internal.commandline.CommandArgIterator;
+import org.apache.ignite.internal.commandline.CommandLogger;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.visor.cdc.VisorCdcCacheDataResendTask;
+import org.apache.ignite.internal.visor.cdc.VisorCdcCacheDataResendTaskArg;
+
+import static org.apache.ignite.internal.commandline.CommandList.CDC;
+import static
org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;
+
+/**
+ * The command to forcefully resend all cache data to CDC.
+ * Iterates over given caches and writes data entries to the WAL to get
captured by CDC.
+ */
+public class ResendCommand extends AbstractCommand<Object> {
+ /** Command name. */
+ public static final String RESEND = "resend";
+
+ /** */
+ public static final String CACHES = "--caches";
+
+ /** */
+ private VisorCdcCacheDataResendTaskArg arg;
+
+ /** {@inheritDoc} */
+ @Override public Object execute(GridClientConfiguration clientCfg,
IgniteLogger log) throws Exception {
+ try (GridClient client = Command.startClient(clientCfg)) {
+ executeTaskByNameOnNode(client,
VisorCdcCacheDataResendTask.class.getName(), arg, null, clientCfg);
+
+ String res = "Successfully resent all cache data to CDC.";
+
+ log.info(res);
+
+ return res;
+ }
+ catch (Throwable e) {
+ log.error("Failed to perform operation.");
+ log.error(CommandLogger.errorMessage(e));
+
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void parseArguments(CommandArgIterator argIter) {
+ Set<String> caches = null;
+
+ while (argIter.hasNextSubArg()) {
+ String opt = argIter.nextArg("Failed to read command argument.");
+
+ if (CACHES.equalsIgnoreCase(opt)) {
+ if (caches != null)
+ throw new IllegalArgumentException(CACHES + " arg
specified twice.");
+
+ caches = argIter.nextStringSet("comma-separated list of cache
names.");
+ }
+ }
+
+ if (F.isEmpty(caches))
+ throw new IllegalArgumentException("At least one cache name should
be specified.");
+
+ arg = new VisorCdcCacheDataResendTaskArg(caches);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printUsage(IgniteLogger logger) {
+ Map<String, String> params = new LinkedHashMap<>();
+
+ params.put(CACHES + " cache1,...,cacheN", "specifies a comma-separated
list of cache names.");
+
+ usage(logger, "Forcefully resend all cache data to CDC. Iterates over
caches and writes primary copies " +
+ "of data entries to the WAL to get captured by CDC:", CDC, params,
RESEND, CACHES, "cache1,...,cacheN");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object arg() {
+ return arg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return RESEND;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean experimental() {
+ return true;
+ }
+}
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
index b06ac82fb4b..3a24c7a59ce 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
@@ -79,8 +79,8 @@ import static
org.apache.ignite.internal.commandline.cache.CacheSubcommands.FIND
import static
org.apache.ignite.internal.commandline.cache.CacheSubcommands.VALIDATE_INDEXES;
import static
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_FIRST;
import static
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_THROUGH;
-import static
org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS;
-import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID;
+import static
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.DELETE_LOST_SEGMENT_LINKS;
+import static
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.NODE_ID;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
index ce8571c4ef4..409a75db2a8 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import
org.apache.ignite.internal.commandline.indexreader.IgniteIndexReaderTest;
import org.apache.ignite.util.CacheMetricsCommandTest;
import org.apache.ignite.util.CdcCommandTest;
+import org.apache.ignite.util.CdcResendCommandTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyCountersTest;
import
org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessAtomicTest;
@@ -67,7 +68,8 @@ import org.junit.runners.Suite;
IgniteIndexReaderTest.class,
- CdcCommandTest.class
+ CdcCommandTest.class,
+ CdcResendCommandTest.class
})
public class IgniteControlUtilityTestSuite2 {
}
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
index b04090e485b..4604fccd878 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
@@ -23,40 +23,66 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
-import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cdc.AbstractCdcTest.UserCdcConsumer;
import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridJobExecuteRequest;
+import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.commandline.CommandList;
+import org.apache.ignite.internal.commandline.cdc.CdcSubcommands;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
+import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT;
import static org.apache.ignite.cdc.CdcSelfTest.addData;
import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
-import static
org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS;
-import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID;
+import static
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.DELETE_LOST_SEGMENT_LINKS;
+import static
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.NODE_ID;
+import static org.apache.ignite.internal.commandline.cdc.ResendCommand.CACHES;
+import static org.apache.ignite.internal.commandline.cdc.ResendCommand.RESEND;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static org.apache.ignite.testframework.GridTestUtils.stopThreads;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* CDC command tests.
*/
public class CdcCommandTest extends GridCommandHandlerAbstractTest {
+ /** */
+ private static final String CDC_DISABLED_DATA_REGION =
"cdc_disabled_data_region";
+
/** */
private IgniteEx srv0;
@@ -66,6 +92,9 @@ public class CdcCommandTest extends
GridCommandHandlerAbstractTest {
/** */
private DistributedChangeableProperty<Serializable> cdcDisabled;
+ /** */
+ private volatile IgniteThrowableConsumer<WALRecord> onLogLsnr;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -75,11 +104,34 @@ public class CdcCommandTest extends
GridCommandHandlerAbstractTest {
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalForceArchiveTimeout(1000)
+ .setDataRegionConfigurations(new DataRegionConfiguration()
+ .setName(CDC_DISABLED_DATA_REGION)
+ .setCdcEnabled(false))
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setCdcEnabled(true)));
cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_ARCHIVED);
+ cfg.setPluginProviders(new AbstractTestPluginProvider() {
+ @Override public String name() {
+ return "Test WAL provider";
+ }
+
+ @Override public <T> @Nullable T createComponent(PluginContext
ctx, Class<T> cls) {
+ if (!IgniteWriteAheadLogManager.class.equals(cls))
+ return null;
+
+ return (T)new
FileWriteAheadLogManager(((IgniteEx)ctx.grid()).context()) {
+ @Override public WALPointer log(WALRecord rec) throws
IgniteCheckedException {
+ if (rec instanceof CdcDataRecord && onLogLsnr != null)
+ onLogLsnr.accept(rec);
+
+ return super.log(rec);
+ }
+ };
+ }
+ });
+
return cfg;
}
@@ -92,6 +144,8 @@ public class CdcCommandTest extends
GridCommandHandlerAbstractTest {
srv0 = startGrid(0);
srv1 = startGrid(1);
+ awaitPartitionMapExchange();
+
cdcDisabled =
srv0.context().distributedConfiguration().property(FileWriteAheadLogManager.CDC_DISABLED);
}
@@ -99,6 +153,8 @@ public class CdcCommandTest extends
GridCommandHandlerAbstractTest {
@Override protected void afterTest() throws Exception {
super.afterTest();
+ stopThreads(log);
+
stopAllGrids();
cleanPersistenceDir();
@@ -111,7 +167,7 @@ public class CdcCommandTest extends
GridCommandHandlerAbstractTest {
assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
CommandList.CDC.text(), "unexpected_command"),
- "Unexpected command: unexpected_command");
+ "Invalid argument: unexpected_command. One of " +
F.asList(CdcSubcommands.values()) + " is expected.");
assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID),
@@ -131,7 +187,7 @@ public class CdcCommandTest extends
GridCommandHandlerAbstractTest {
CdcConfiguration cfg = new CdcConfiguration();
- cfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() {
+ cfg.setConsumer(new UserCdcConsumer() {
@Override public void start(MetricRegistry mreg) {
appStarted.countDown();
}
@@ -148,8 +204,6 @@ public class CdcCommandTest extends
GridCommandHandlerAbstractTest {
"Failed to delete lost segment CDC links. Unable to acquire lock
to lock CDC folder.");
assertFalse(fut.isDone());
-
- fut.cancel();
}
/** */
@@ -222,4 +276,257 @@ public class CdcCommandTest extends
GridCommandHandlerAbstractTest {
latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
}
+
+ /** */
+ @Test
+ public void testParseResend() {
+ injectTestSystemOut();
+
+ assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
+ CommandList.CDC.text(), "unexpected_command"),
+ "Invalid argument: unexpected_command. One of " +
F.asList(CdcSubcommands.values()) + " is expected.");
+
+ assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
+ CommandList.CDC.text(), RESEND),
+ "At least one cache name should be specified.");
+
+ assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
+ CommandList.CDC.text(), RESEND, CACHES),
+ "At least one cache name should be specified.");
+ }
+
+ /** */
+ @Test
+ public void testResendCacheData() throws Exception {
+ UserCdcConsumer cnsmr0 = runCdc(srv0);
+ UserCdcConsumer cnsmr1 = runCdc(srv1);
+
+ addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+ waitForSize(cnsmr0,
srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+ waitForSize(cnsmr1,
srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+
+ cnsmr0.clear();
+ cnsmr1.clear();
+
+ executeCommand(EXIT_CODE_OK, CommandList.CDC.text(), RESEND, CACHES,
DEFAULT_CACHE_NAME);
+
+ waitForSize(cnsmr0,
srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+ waitForSize(cnsmr1,
srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+ }
+
+ /** */
+ @Test
+ public void testResendCachesNotExist() {
+ injectTestSystemOut();
+
+ assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+ CommandList.CDC.text(), RESEND, CACHES, "unknown_cache"),
+ "Cache does not exist");
+
+ String cdcDisabledCacheName = "cdcDisabledCache";
+
+ srv0.getOrCreateCache(new CacheConfiguration<>()
+ .setName(cdcDisabledCacheName)
+ .setDataRegionName(CDC_DISABLED_DATA_REGION));
+
+ assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+ CommandList.CDC.text(), RESEND, CACHES, cdcDisabledCacheName),
+ "CDC is not enabled for given cache");
+ }
+
+ /** */
+ @Test
+ public void testResendCancelOnNodeLeft() {
+ injectTestSystemOut();
+
+ addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+ for (Ignite srv : G.allGrids()) {
+ TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg)
-> {
+ if (msg instanceof GridJobExecuteResponse) {
+ GridTestUtils.runAsync(srv::close);
+
+ return true;
+ }
+
+ return false;
+ });
+ }
+
+ assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+ CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME),
+ "CDC cache data resend cancelled. Failed to resend cache data on
the node");
+ }
+
+ /** */
+ @Test
+ public void testResendCancelOnRebalanceInProgress() throws Exception {
+ injectTestSystemOut();
+
+ addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+ CountDownLatch rebalanceStarted = new CountDownLatch(1);
+
+ for (Ignite srv : G.allGrids()) {
+ TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg)
-> {
+ if (msg instanceof GridDhtPartitionSupplyMessage) {
+ rebalanceStarted.countDown();
+
+ return true;
+ }
+
+ return false;
+ });
+ }
+
+ GridTestUtils.runAsync(() -> startGrid(3));
+
+ rebalanceStarted.await();
+
+ assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+ CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME),
+ "CDC cache data resend cancelled. Rebalance sheduled");
+ }
+
+ /** */
+ @Test
+ public void testResendCancelOnTopologyChangeBeforeStart() throws Exception
{
+ injectTestSystemOut();
+
+ addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+ CountDownLatch blocked = new CountDownLatch(1);
+
+ for (Ignite srv : G.allGrids()) {
+ TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg)
-> {
+ if (msg instanceof GridJobExecuteRequest) {
+ blocked.countDown();
+
+ return true;
+ }
+
+ return false;
+ });
+ }
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> {
+ assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+ CommandList.CDC.text(), RESEND, CACHES,
DEFAULT_CACHE_NAME),
+ "CDC cache data resend cancelled. Topology changed");
+ });
+
+ blocked.await();
+
+ startGrid(3);
+ awaitPartitionMapExchange();
+
+ for (Ignite srv : G.allGrids())
+ TestRecordingCommunicationSpi.spi(srv).stopBlock();
+
+ fut.get();
+ }
+
+ /** */
+ @Test
+ public void testResendCancelOnTopologyChange() throws Exception {
+ injectTestSystemOut();
+
+ addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+ CountDownLatch preload = new CountDownLatch(1);
+ CountDownLatch topologyChanged = new CountDownLatch(1);
+
+ AtomicInteger cnt = new AtomicInteger();
+
+ onLogLsnr = rec -> {
+ if (cnt.incrementAndGet() < KEYS_CNT / 2)
+ return;
+
+ preload.countDown();
+
+ U.await(topologyChanged);
+ };
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> {
+ assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+ CommandList.CDC.text(), RESEND, CACHES,
DEFAULT_CACHE_NAME),
+ "CDC cache data resend cancelled. Topology changed");
+ });
+
+ preload.await();
+
+ startGrid(3);
+
+ topologyChanged.countDown();
+
+ fut.get();
+ }
+
+ /** */
+ @Test
+ public void testResendOnClientJoin() throws Exception {
+ UserCdcConsumer cnsmr0 = runCdc(srv0);
+ UserCdcConsumer cnsmr1 = runCdc(srv1);
+
+ addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+ waitForSize(cnsmr0,
srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+ waitForSize(cnsmr1,
srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+
+ cnsmr0.clear();
+ cnsmr1.clear();
+
+ CountDownLatch blocked = new CountDownLatch(1);
+
+ for (Ignite srv : G.allGrids()) {
+ TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg)
-> {
+ if (msg instanceof GridJobExecuteRequest) {
+ blocked.countDown();
+
+ return true;
+ }
+
+ return false;
+ });
+ }
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> {
+ executeCommand(EXIT_CODE_OK, CommandList.CDC.text(), RESEND,
CACHES, DEFAULT_CACHE_NAME);
+ });
+
+ blocked.await();
+
+ startClientGrid("client");
+
+ for (Ignite srv : G.allGrids())
+ TestRecordingCommunicationSpi.spi(srv).stopBlock();
+
+ fut.get();
+
+ waitForSize(cnsmr0,
srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+ waitForSize(cnsmr1,
srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+ }
+
+ /** */
+ public static UserCdcConsumer runCdc(Ignite ign) {
+ UserCdcConsumer cnsmr = new UserCdcConsumer();
+
+ CdcConfiguration cfg = new CdcConfiguration();
+
+ cfg.setConsumer(cnsmr);
+ cfg.setKeepBinary(false);
+
+ CdcMain cdc = new CdcMain(ign.configuration(), null, cfg);
+
+ GridTestUtils.runAsync(cdc);
+
+ return cnsmr;
+ }
+
+ /** */
+ public static void waitForSize(UserCdcConsumer cnsmr, int expSize) throws
Exception {
+ assertTrue(waitForCondition(() -> expSize == cnsmr.data(UPDATE,
CU.cacheId(DEFAULT_CACHE_NAME)).size(),
+ 60_000));
+ }
}
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
new file mode 100644
index 00000000000..55416388c89
--- /dev/null
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.commandline.CommandList;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT;
+import static org.apache.ignite.cdc.CdcSelfTest.addData;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static org.apache.ignite.internal.commandline.cdc.ResendCommand.CACHES;
+import static org.apache.ignite.internal.commandline.cdc.ResendCommand.RESEND;
+import static org.apache.ignite.testframework.GridTestUtils.stopThreads;
+import static org.apache.ignite.util.CdcCommandTest.runCdc;
+import static org.apache.ignite.util.CdcCommandTest.waitForSize;
+
+/**
+ * CDC resend command tests.
+ */
+public class CdcResendCommandTest extends GridCommandHandlerAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setWalForceArchiveTimeout(1000)
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setCdcEnabled(true)
+ .setPersistenceEnabled(true)));
+
+ cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setBackups(1));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopThreads(log);
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @Test
+ public void testResendCacheDataRestoreFromWal() throws Exception {
+ IgniteEx ign = startGrid(0);
+
+ ign.cluster().state(ACTIVE);
+
+ enableCheckpoints(ign, false);
+
+ addData(ign.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+ AbstractCdcTest.UserCdcConsumer cnsmr = runCdc(ign);
+
+ waitForSize(cnsmr, KEYS_CNT);
+
+ cnsmr.clear();
+
+ executeCommand(EXIT_CODE_OK, CommandList.CDC.text(), RESEND, CACHES,
DEFAULT_CACHE_NAME);
+
+ waitForSize(cnsmr, KEYS_CNT);
+
+ stopAllGrids();
+
+ ign = startGrid(0);
+
+ assertEquals(KEYS_CNT, ign.cache(DEFAULT_CACHE_NAME).size());
+ }
+}
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
index 339247850f2..a3cf746a826 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
@@ -119,8 +119,8 @@ import static
org.apache.ignite.internal.commandline.cache.CacheDestroy.DESTROY_
import static
org.apache.ignite.internal.commandline.cache.CacheSubcommands.CLEAR;
import static
org.apache.ignite.internal.commandline.cache.CacheSubcommands.DESTROY;
import static
org.apache.ignite.internal.commandline.cache.CacheSubcommands.HELP;
-import static
org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS;
-import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID;
+import static
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.DELETE_LOST_SEGMENT_LINKS;
+import static
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.NODE_ID;
import static
org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.CACHE;
import static
org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.PARTITIONS;
import static
org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.STRATEGY;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 9ff1fc7adb3..3b187a0593d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -79,6 +79,7 @@ import static
org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
import static
org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer;
import static
org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
+import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
@@ -489,7 +490,7 @@ public class CdcMain implements Runnable {
.marshallerMappingFileStoreDir(marshaller)
.keepBinary(cdcCfg.isKeepBinary())
.filesOrDirs(segment.toFile())
- .addFilter((type, ptr) -> type == DATA_RECORD_V2);
+ .addFilter((type, ptr) -> type == DATA_RECORD_V2 || type ==
CDC_DATA_RECORD);
if (igniteCfg.getDataStorageConfiguration().getPageSize() != 0)
builder.pageSize(igniteCfg.getDataStorageConfiguration().getPageSize());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDataRecord.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDataRecord.java
new file mode 100644
index 00000000000..25e0defe66b
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDataRecord.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * The record to forcefully resend cache data to the CDC application.
+ */
+public class CdcDataRecord extends DataRecord {
+ /** */
+ public CdcDataRecord(DataEntry writeEntry) {
+ super(writeEntry);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.CDC_DATA_RECORD;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CdcDataRecord.class, this, "super",
super.toString());
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index baa51c35286..3457251e0f4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -288,7 +288,10 @@ public abstract class WALRecord {
INCREMENTAL_SNAPSHOT_START_RECORD(76, LOGICAL),
/** Incremental snapshot finish record. */
- INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL);
+ INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL),
+
+ /** CDC data record. */
+ CDC_DATA_RECORD(78, CUSTOM);
/** Index for serialization. Should be consistent throughout all
versions. */
private final int idx;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index edbc38d5dc3..f68b366fbb6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -152,6 +152,7 @@ import static
org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX;
@@ -990,7 +991,7 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
// Only data records handled by CDC.
// No need to forcefully rollover for other record types.
- if (walForceArchiveTimeout > 0 && rec.type() ==
DATA_RECORD_V2)
+ if (walForceArchiveTimeout > 0 && (rec.type() ==
DATA_RECORD_V2 || rec.type() == CDC_DATA_RECORD))
lastDataRecordLoggedMs.set(millis);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 051be3616a4..df6ca772291 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -360,6 +360,7 @@ class StandaloneWalRecordsIterator extends
AbstractWalRecordsIterator {
if (processor != null && (rec.type() == RecordType.DATA_RECORD
|| rec.type() == RecordType.DATA_RECORD_V2
+ || rec.type() == RecordType.CDC_DATA_RECORD
|| rec.type() == RecordType.MVCC_DATA_RECORD)) {
try {
return postProcessDataRecord((DataRecord)rec, kernalCtx,
processor);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index f902295dcc9..3907bf2c98f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -126,6 +126,7 @@ import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V2;
@@ -2171,7 +2172,7 @@ public class RecordDataV1Serializer implements
RecordDataSerializer {
int partId = in.readInt();
long partCntr = in.readLong();
long expireTime = in.readLong();
- byte flags = type == DATA_RECORD_V2 ? in.readByte() : (byte)0;
+ byte flags = type == DATA_RECORD_V2 || type == CDC_DATA_RECORD ?
in.readByte() : (byte)0;
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 62fc50ef73f..77cc5eb0cfe 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -91,6 +91,7 @@ public class RecordDataV2Serializer extends
RecordDataV1Serializer {
return 18 + cacheStatesSize + (walPtr == null ? 0 : 16);
case MVCC_DATA_RECORD:
+ case CDC_DATA_RECORD:
return 4/*entry count*/ + 8/*timestamp*/ +
dataSize((DataRecord)rec);
case DATA_RECORD_V2:
@@ -162,6 +163,7 @@ public class RecordDataV2Serializer extends
RecordDataV1Serializer {
case DATA_RECORD:
case DATA_RECORD_V2:
+ case CDC_DATA_RECORD:
int entryCnt = in.readInt();
long timeStamp = in.readLong();
@@ -272,6 +274,7 @@ public class RecordDataV2Serializer extends
RecordDataV1Serializer {
case MVCC_DATA_RECORD:
case DATA_RECORD_V2:
+ case CDC_DATA_RECORD:
DataRecord dataRec = (DataRecord)rec;
int entryCnt = dataRec.entryCount();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTask.java
new file mode 100644
index 00000000000..20f5fe1c82f
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTask.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task to forcefully resend all cache data to CDC.
+ * Iterates over caches and writes primary copies of data entries to the WAL
to get captured by CDC.
+ */
+@GridInternal
+public class VisorCdcCacheDataResendTask extends
VisorMultiNodeTask<VisorCdcCacheDataResendTaskArg, Void, Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Topology version when task was started. */
+ private AffinityTopologyVersion topVer;
+
+ /** {@inheritDoc} */
+ @Override protected VisorJob<VisorCdcCacheDataResendTaskArg, Void>
job(VisorCdcCacheDataResendTaskArg arg) {
+ return new VisorCdcCacheDataResendJob(arg, topVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<UUID>
jobNodes(VisorTaskArgument<VisorCdcCacheDataResendTaskArg> arg) {
+ // Check there is no rebalance.
+ GridDhtPartitionsExchangeFuture fut =
ignite.context().cache().context().exchange().lastFinishedFuture();
+
+ if (!fut.rebalanced()) {
+ throw new IgniteException("CDC cache data resend cancelled.
Rebalance sheduled " +
+ "[topVer=" + fut.topologyVersion() + ']');
+ }
+
+ // Cancel resend if affinity will change.
+ topVer =
ignite.context().cache().context().exchange().lastAffinityChangedTopologyVersion(fut.topologyVersion());
+
+ return F.nodeIds(ignite.cluster().forServers().nodes());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected @Nullable Void reduce0(List<ComputeJobResult> results)
throws IgniteException {
+ for (ComputeJobResult res : results) {
+ if (res.getException() != null) {
+ throw new IgniteException("CDC cache data resend cancelled.
Failed to resend cache data " +
+ "on the node [nodeId=" + res.getNode().id() + ']',
res.getException());
+ }
+ }
+
+ return null;
+ }
+
+ /** */
+ private static class VisorCdcCacheDataResendJob extends
VisorJob<VisorCdcCacheDataResendTaskArg, Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Injected logger. */
+ @LoggerResource
+ protected IgniteLogger log;
+
+ /** */
+ private IgniteWriteAheadLogManager wal;
+
+ /** */
+ private GridCachePartitionExchangeManager<Object, Object> exchange;
+
+ /** Topology version when task was started. */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private GridDhtPartitionsExchangeFuture lastFut;
+
+ /**
+ * @param arg Job argument.
+ * @param topVer Topology version when task was started.
+ */
+ protected VisorCdcCacheDataResendJob(VisorCdcCacheDataResendTaskArg
arg, AffinityTopologyVersion topVer) {
+ super(arg, false);
+
+ this.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Void run(VisorCdcCacheDataResendTaskArg arg)
throws IgniteException {
+ if (F.isEmpty(arg.caches()))
+ throw new IllegalArgumentException("Caches are not
specified.");
+
+ List<IgniteInternalCache<?, ?>> caches = new ArrayList<>();
+
+ for (String name : arg.caches()) {
+ IgniteInternalCache<?, ?> cache =
ignite.context().cache().cache(name);
+
+ if (cache == null)
+ throw new IgniteException("Cache does not exist
[cacheName=" + name + ']');
+
+ if (!cache.context().dataRegion().config().isCdcEnabled()) {
+ throw new IgniteException("CDC is not enabled for given
cache [cacheName=" + name +
+ ", dataRegionName=" +
cache.context().dataRegion().config().getName() + ']');
+ }
+
+ if (cache.context().mvccEnabled())
+ throw new UnsupportedOperationException("The
TRANSACTIONAL_SNAPSHOT mode is not supported.");
+
+ caches.add(cache);
+ }
+
+ if (log.isInfoEnabled())
+ log.info("CDC cache data resend started [caches=" +
String.join(", ", arg.caches()) + ']');
+
+ wal = ignite.context().cache().context().wal(true);
+ exchange = ignite.context().cache().context().exchange();
+
+ try {
+ Iterator<IgniteInternalCache<?, ?>> iter = caches.iterator();
+
+ while (iter.hasNext() && !isCancelled())
+ resendCacheData(iter.next());
+
+ wal.flush(null, true);
+
+ if (log.isInfoEnabled()) {
+ log.info("CDC cache data resend " + (isCancelled() ?
"cancelled" : "finished") +
+ " [caches=" + String.join(", ", arg.caches()) + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ return null;
+ }
+
+ /** @param cache Cache. */
+ private void resendCacheData(IgniteInternalCache<?, ?> cache) throws
IgniteCheckedException {
+ if (log.isInfoEnabled())
+ log.info("CDC cache data resend started [cacheName=" +
cache.name() + ']');
+
+ GridCacheContext<?, ?> cctx = cache.context();
+
+ GridIterator<CacheDataRow> localRows = cctx.offheap()
+ .cacheIterator(cctx.cacheId(), true, false,
AffinityTopologyVersion.NONE, null, null);
+
+ long cnt = 0;
+ Set<Integer> parts = new TreeSet<>();
+
+ for (CacheDataRow row : localRows) {
+ if (isCancelled())
+ break;
+
+ ensureTopologyNotChanged();
+
+ KeyCacheObject key = row.key();
+
+ if (log.isTraceEnabled())
+ log.trace("Resend key: " + key);
+
+ CdcDataRecord rec = new CdcDataRecord(new DataEntry(
+ cctx.cacheId(),
+ key,
+ row.value(),
+ GridCacheOperation.CREATE,
+ null,
+ row.version(),
+ row.expireTime(),
+ key.partition(),
+ -1,
+ DataEntry.flags(true))
+ );
+
+ wal.log(rec);
+
+ parts.add(key.partition());
+
+ if ((++cnt % 1_000 == 0) && log.isDebugEnabled())
+ log.debug("Resend entries count: " + cnt);
+ }
+
+ if (log.isInfoEnabled()) {
+ if (isCancelled())
+ log.info("CDC cache data resend cancelled.");
+ else {
+ log.info("CDC cache data resend finished [cacheName=" +
cache.name() +
+ ", entriesCnt=" + cnt + ", parts=" + parts + ']');
+ }
+ }
+ }
+
+ /** */
+ private void ensureTopologyNotChanged() {
+ GridDhtPartitionsExchangeFuture fut =
exchange.lastFinishedFuture();
+
+ if (lastFut != fut) {
+ AffinityTopologyVersion lastChanged =
exchange.lastAffinityChangedTopologyVersion(fut.topologyVersion());
+
+ if (!topVer.equals(lastChanged)) {
+ throw new IgniteException("CDC cache data resend
cancelled. Topology changed during resend " +
+ "[startTopVer=" + topVer + ", currentTopVer=" +
fut.topologyVersion() + ']');
+ }
+
+ lastFut = fut;
+ }
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTaskArg.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTaskArg.java
new file mode 100644
index 00000000000..9f64f42955d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTaskArg.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cdc;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Set;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** */
+public class VisorCdcCacheDataResendTaskArg extends IgniteDataTransferObject {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache names. */
+ private Set<String> caches;
+
+ /** */
+ public VisorCdcCacheDataResendTaskArg() {
+ // No-op.
+ }
+
+ /** */
+ public VisorCdcCacheDataResendTaskArg(Set<String> caches) {
+ this.caches = caches;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws
IOException {
+ U.writeCollection(out, caches);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(byte protoVer, ObjectInput in)
throws IOException, ClassNotFoundException {
+ caches = U.readSet(in);
+ }
+
+ /** @return Cache names. */
+ public Set<String> caches() {
+ return caches;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
index 6222fb2a68f..3c8c32c7a07 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
@@ -344,7 +344,12 @@ public abstract class AbstractCdcTest extends
GridCommonAbstractTest {
/** @return Read keys. */
public List<T> data(ChangeEventType op, int cacheId) {
- return data.get(F.t(op, cacheId));
+ return data.computeIfAbsent(F.t(op, cacheId), k -> new
ArrayList<>());
+ }
+
+ /** */
+ public void clear() {
+ data.clear();
}
/** */
@@ -405,7 +410,8 @@ public abstract class AbstractCdcTest extends
GridCommonAbstractTest {
String typeName = m.typeName();
assertFalse(typeName.isEmpty());
- assertEquals(mapper.typeId(typeName), m.typeId());
+ // Can also be registered by OptimizedMarshaller.
+ assertTrue(m.typeId() == mapper.typeId(typeName) || m.typeId()
== typeName.hashCode());
});
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
index 63ee1a405ad..60ffcee8686 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
@@ -112,6 +112,7 @@ import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_RECYCLE;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REMOVE;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REPLACE;
+import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CONSISTENT_CUT;
@@ -191,6 +192,7 @@ public class RecordUtils {
put(PAGE_RECORD, RecordUtils::buildPageSnapshot);
put(DATA_RECORD, RecordUtils::buildDataRecord);
put(DATA_RECORD_V2, RecordUtils::buildDataRecord);
+ put(CDC_DATA_RECORD, RecordUtils::buildDataRecord);
put(CHECKPOINT_RECORD, RecordUtils::buildCheckpointRecord);
put(HEADER_RECORD, buildUpsupportedWalRecord(HEADER_RECORD));
put(INIT_NEW_PAGE_RECORD, RecordUtils::buildInitNewPageRecord);
diff --git
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
index ef8ff50fb03..6987c0be58c 100644
---
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
+++
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
@@ -351,7 +351,14 @@ If the file name isn't specified the output file name is:
'<typeId>.bin'
control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id]
[--yes]
Parameters:
- node_id - ID of the node to delete lost segment links from. If not set,
the command will affect all server nodes.
+ --node-id node_id - ID of the node to delete lost segment links from.
If not set, the command will affect all server nodes.
+
+ [EXPERIMENTAL]
+ Forcefully resend all cache data to CDC. Iterates over caches and writes
primary copies of data entries to the WAL to get captured by CDC:
+ control.(sh|bat) --cdc resend --caches cache1,...,cacheN
+
+ Parameters:
+ --caches cache1,...,cacheN - specifies a comma-separated list of cache
names.
By default commands affecting the cluster require interactive confirmation.
Use --yes option to disable it.
diff --git
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
index ef8ff50fb03..6987c0be58c 100644
---
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
+++
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
@@ -351,7 +351,14 @@ If the file name isn't specified the output file name is:
'<typeId>.bin'
control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id]
[--yes]
Parameters:
- node_id - ID of the node to delete lost segment links from. If not set,
the command will affect all server nodes.
+ --node-id node_id - ID of the node to delete lost segment links from.
If not set, the command will affect all server nodes.
+
+ [EXPERIMENTAL]
+ Forcefully resend all cache data to CDC. Iterates over caches and writes
primary copies of data entries to the WAL to get captured by CDC:
+ control.(sh|bat) --cdc resend --caches cache1,...,cacheN
+
+ Parameters:
+ --caches cache1,...,cacheN - specifies a comma-separated list of cache
names.
By default commands affecting the cluster require interactive confirmation.
Use --yes option to disable it.