This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch cache_dumps
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/cache_dumps by this push:
new 8018473c5f9 IGNITE-20472 Dump API implemented (#10953)
8018473c5f9 is described below
commit 8018473c5f918c8a9e7865b655e182c5226544e8
Author: Nikolay <[email protected]>
AuthorDate: Fri Oct 6 10:26:43 2023 +0300
IGNITE-20472 Dump API implemented (#10953)
---
.../java/org/apache/ignite/dump/DumpConsumer.java | 83 ++++++
.../persistence/snapshot => }/dump/DumpEntry.java | 16 +-
.../java/org/apache/ignite/dump/DumpReader.java | 210 ++++++++++++++
.../ignite/dump/DumpReaderConfiguration.java | 112 ++++++++
.../org/apache/ignite/internal/cdc/CdcMain.java | 57 ++--
.../org/apache/ignite/internal/cdc/CdcUtils.java | 60 ++++
.../pagemem/wal/record/UnwrapDataEntry.java | 48 ++--
.../snapshot/SnapshotPartitionsVerifyHandler.java | 40 +--
.../snapshot/dump/CreateDumpFutureTask.java | 57 ++--
.../cache/persistence/snapshot/dump/Dump.java | 119 ++++++--
.../snapshot/dump/DumpEntrySerializer.java | 27 +-
.../wal/reader/IgniteWalIteratorFactory.java | 13 +-
.../wal/reader/StandaloneGridKernalContext.java | 18 ++
.../snapshot/dump/AbstractCacheDumpTest.java | 306 +++++++++++++++------
.../snapshot/dump/IgniteCacheDumpSelfTest.java | 9 +-
.../dump/IgniteConcurrentCacheDumpTest.java | 4 +-
.../junits/GridTestKernalContext.java | 5 +-
.../ignite/internal/dump/DumpCacheConfigTest.java | 108 +++++---
18 files changed, 1031 insertions(+), 261 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/dump/DumpConsumer.java
b/modules/core/src/main/java/org/apache/ignite/dump/DumpConsumer.java
new file mode 100644
index 00000000000..aec3a02c997
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpConsumer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.dump;
+
+import java.util.Iterator;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ * Consumer of {@link Dump}.
+ * This consumer will receive all {@link DumpEntry} stored in cache dump
during {@code IgniteDumpReader} application invocation.
+ * The lifecycle of the consumer is the following:
+ * <ul>
+ * <li>Start of the consumer {@link #start()}.</li>
+ * <li>Stop of the consumer {@link #stop()}.</li>
+ * </ul>
+ *
+ */
+@IgniteExperimental
+public interface DumpConsumer {
+ /**
+ * Starts the consumer.
+ */
+ void start();
+
+ /**
+ * Handles type mappings.
+ * @param mappings Mappings iterator.
+ */
+ void onMappings(Iterator<TypeMapping> mappings);
+
+ /**
+ * Handles binary types.
+ * @param types Binary types iterator.
+ */
+ void onTypes(Iterator<BinaryType> types);
+
+ /**
+ * Handles cache configs.
+ * Note, there can be several copies of cache config in the dump.
+ * This can happen if dump contains data from several nodes.
+ * @param caches Stored cache data.
+ */
+ void onCacheConfigs(Iterator<StoredCacheData> caches);
+
+ /**
+ * Handles cache data.
+ * This method can be invoced by several threads concurrently.
+ * Note, there can be several copies of group partition in the dump.
+ * This can happen if dump contains data from several nodes.
+ * In this case callback will be invoked several time for the same pair of
[grp, part] values.
+ *
+ * @param grp Group id.
+ * @param part Partition.
+ * @param data Cache data iterator.
+ * @see DumpReaderConfiguration#threadCount()
+ */
+ void onPartition(int grp, int part, Iterator<DumpEntry> data);
+
+ /**
+ * Stops the consumer.
+ * This method can be invoked only after {@link #start()}.
+ */
+ void stop();
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntry.java
b/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java
similarity index 71%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntry.java
rename to modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java
index 5054c97806c..ca34f3cbace 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java
@@ -15,14 +15,20 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+package org.apache.ignite.dump;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import java.util.Iterator;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
+import org.apache.ignite.lang.IgniteExperimental;
/**
* Single cache entry from dump.
+ *
+ * @see Dump#iterator(String, int, int)
+ * @see DumpConsumer#onPartition(int, int, Iterator)
+ * @see org.apache.ignite.IgniteSnapshot#createDump(String)
*/
+@IgniteExperimental
public interface DumpEntry {
/** @return Cache id. */
public int cacheId();
@@ -31,8 +37,8 @@ public interface DumpEntry {
public long expireTime();
/** @return Key. */
- public KeyCacheObject key();
+ public Object key();
/** @return Value. */
- public CacheObject value();
+ public Object value();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
new file mode 100644
index 00000000000..37923316617
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.dump;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridLoggerProxy;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.cdc.CdcMain;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.DumpedPartitionIterator;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_MARSHALLER_PATH;
+import static org.apache.ignite.internal.IgniteKernal.NL;
+import static org.apache.ignite.internal.IgniteKernal.SITE;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+
+/**
+ * Dump Reader application.
+ * The application runs independently of Ignite node process and provides the
ability to the {@link DumpConsumer} to consume
+ * all data stored in cache dump ({@link Dump})
+ */
+public class DumpReader implements Runnable {
+ /** Configuration. */
+ private final DumpReaderConfiguration cfg;
+
+ /** Log. */
+ private final IgniteLogger log;
+
+ /**
+ * @param cfg Dump reader configuration.
+ * @param log Logger.
+ */
+ public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {
+ this.cfg = cfg;
+ this.log = log.getLogger(DumpReader.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ ackAsciiLogo();
+
+ try (Dump dump = new Dump(cfg.dumpRoot(), cfg.keepBinary(), false,
log)) {
+ DumpConsumer cnsmr = cfg.consumer();
+
+ cnsmr.start();
+
+ try {
+ File[] files = new File(cfg.dumpRoot(),
DFLT_MARSHALLER_PATH).listFiles(BinaryUtils::notTmpFile);
+
+ if (files != null)
+ cnsmr.onMappings(CdcMain.typeMappingIterator(files, tm ->
true));
+
+ cnsmr.onTypes(dump.types());
+
+ Map<Integer, List<String>> grpToNodes = new HashMap<>();
+
+ for (SnapshotMetadata meta : dump.metadata()) {
+ for (Integer grp : meta.cacheGroupIds())
+ grpToNodes.computeIfAbsent(grp, key -> new
ArrayList<>()).add(meta.folderName());
+ }
+
+ cnsmr.onCacheConfigs(grpToNodes.entrySet().stream()
+ .flatMap(e -> dump.configs(F.first(e.getValue()),
e.getKey()).stream())
+ .iterator());
+
+ ExecutorService execSvc = cfg.threadCount() > 1 ?
Executors.newFixedThreadPool(cfg.threadCount()) : null;
+
+ AtomicBoolean skip = new AtomicBoolean(false);
+
+ for (Map.Entry<Integer, List<String>> e :
grpToNodes.entrySet()) {
+ int grp = e.getKey();
+
+ for (String node : e.getValue()) {
+ for (int part : dump.partitions(node, grp)) {
+ Runnable consumePart = () -> {
+ if (skip.get()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Skip partition due to
previous error [node=" + node + ", grp=" + grp +
+ ", part=" + part + ']');
+ }
+
+ return;
+ }
+
+ try (DumpedPartitionIterator iter =
dump.iterator(node, grp, part)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Consuming partition [node="
+ node + ", grp=" + grp +
+ ", part=" + part + ']');
+ }
+
+ cnsmr.onPartition(grp, part, iter);
+ }
+ catch (Exception ex) {
+ skip.set(cfg.failFast());
+
+ log.error("Error consuming partition
[node=" + node + ", grp=" + grp +
+ ", part=" + part + ']', ex);
+
+ throw new IgniteException(ex);
+ }
+ };
+
+ if (cfg.threadCount() > 1)
+ execSvc.submit(consumePart);
+ else
+ consumePart.run();
+ }
+ }
+ }
+
+ if (cfg.threadCount() > 1) {
+ execSvc.shutdown();
+
+ boolean res =
execSvc.awaitTermination(cfg.timeout().toMillis(), MILLISECONDS);
+
+ if (!res) {
+ log.warning("Dump processing tasks not finished after
timeout. Cancelling");
+
+ execSvc.shutdownNow();
+ }
+ }
+ }
+ finally {
+ cnsmr.stop();
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ private void ackAsciiLogo() {
+ String ver = "ver. " + ACK_VER_STR;
+
+ if (log.isInfoEnabled()) {
+ log.info(NL + NL +
+ ">>> __________ ________________ ___ __ ____ ______
___ _______ ___ _______" + NL +
+ ">>> / _/ ___/ |/ / _/_ __/ __/ / _ \\/ / / / |/ / _ \\
/ _ \\/ __/ _ | / _ \\/ __/ _ \\" + NL +
+ ">>> _/ // (_ / // / / / / _/ / // / /_/ / /|_/ / ___/ /
, _/ _// __ |/ // / _// , _/" + NL +
+ ">>> /___/\\___/_/|_/___/ /_/ /___/ /____/\\____/_/ /_/_/
/_/|_/___/_/ |_/____/___/_/|_|" + NL +
+ ">>> " + NL +
+ ">>> " + ver + NL +
+ ">>> " + COPYRIGHT + NL +
+ ">>> " + NL +
+ ">>> Ignite documentation: " + "http://" + SITE + NL +
+ ">>> ConsistentId: " + cfg.dumpRoot() + NL +
+ ">>> Consumer: " + U.toStringSafe(cfg.consumer())
+ );
+ }
+
+ if (log.isQuiet()) {
+ U.quiet(false,
+ " __________ ________________ ___ __ ____ ______ ___
_______ ___ _______",
+ " / _/ ___/ |/ / _/_ __/ __/ / _ \\/ / / / |/ / _ \\ /
_ \\/ __/ _ | / _ \\/ __/ _ \\",
+ " _/ // (_ / // / / / / _/ / // / /_/ / /|_/ / ___/ / ,
_/ _// __ |/ // / _// , _/",
+ "/___/\\___/_/|_/___/ /_/ /___/ /____/\\____/_/ /_/_/
/_/|_/___/_/ |_/____/___/_/|_|",
+ "",
+ ver,
+ COPYRIGHT,
+ "",
+ "Ignite documentation: " + "http://" + SITE,
+ "Dump: " + cfg.dumpRoot(),
+ "Consumer: " + U.toStringSafe(cfg.consumer()),
+ "",
+ "Quiet mode.");
+
+ String fileName = log.fileName();
+
+ if (fileName != null)
+ U.quiet(false, " ^-- Logging to file '" + fileName + '\'');
+
+ if (log instanceof GridLoggerProxy)
+ U.quiet(false, " ^-- Logging by '" +
((GridLoggerProxy)log).getLoggerInfo() + '\'');
+
+ U.quiet(false,
+ " ^-- To see **FULL** console log here add
-DIGNITE_QUIET=false or \"-v\" to ignite-cdc.{sh|bat}",
+ "");
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
new file mode 100644
index 00000000000..c967136fe81
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.dump;
+
+import java.io.File;
+import java.time.Duration;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ * Configuration class of {@link DumpReader}.
+ *
+ * @see DumpReader
+ * @see DumpConsumer
+ */
+@IgniteExperimental
+public class DumpReaderConfiguration {
+ /** Default timeout. */
+ public static final Duration DFLT_TIMEOUT = Duration.ofDays(7);
+
+ /** Default thread count. */
+ public static final int DFLT_THREAD_CNT = 1;
+
+ /** Root dump directory. */
+ private final File dir;
+
+ /** Dump consumer. */
+ private final DumpConsumer cnsmr;
+
+ /** Count of threads to consume dumped partitions. */
+ private final int thCnt;
+
+ /** Timeout of dump reader. 1 week, by default. */
+ private final Duration timeout;
+
+ /** Stop processing partitions if consumer fail to process one. */
+ private final boolean failFast;
+
+ /** If {@code true} then don't deserialize {@link KeyCacheObject} and
{@link CacheObject}. */
+ private final boolean keepBinary;
+
+ /**
+ * @param dir Root dump directory.
+ * @param cnsmr Dump consumer.
+ */
+ public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) {
+ this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true);
+ }
+
+ /**
+ * @param dir Root dump directory.
+ * @param cnsmr Dump consumer.
+ * @param thCnt Count of threads to consume dumped partitions.
+ * @param timeout Timeout of dump reader invocation.
+ * @param failFast Stop processing partitions if consumer fail to process
one.
+ * @param keepBinary If {@code true} then don't deserialize {@link
KeyCacheObject} and {@link CacheObject}.
+ */
+ public DumpReaderConfiguration(File dir, DumpConsumer cnsmr, int thCnt,
Duration timeout, boolean failFast, boolean keepBinary) {
+ this.dir = dir;
+ this.cnsmr = cnsmr;
+ this.thCnt = thCnt;
+ this.timeout = timeout;
+ this.failFast = failFast;
+ this.keepBinary = keepBinary;
+ }
+
+ /** @return Root dump directiory. */
+ public File dumpRoot() {
+ return dir;
+ }
+
+ /** @return Dump consumer instance. */
+ public DumpConsumer consumer() {
+ return cnsmr;
+ }
+
+ /** @return Count of threads to consume dumped partitions. */
+ public int threadCount() {
+ return thCnt;
+ }
+
+ /** @return Timeout of dump reader invocation. */
+ public Duration timeout() {
+ return timeout;
+ }
+
+ /** @return {@code True} if stop processing after first consumer error. */
+ public boolean failFast() {
+ return failFast;
+ }
+
+ /** @return If {@code true} then don't deserialize {@link KeyCacheObject}
and {@link CacheObject}. */
+ public boolean keepBinary() {
+ return keepBinary;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 1e73e75be22..da0e791a905 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -44,7 +45,6 @@ import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridLoggerProxy;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.MarshallerContextImpl;
@@ -82,6 +82,8 @@ import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex;
+import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
+import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
/**
@@ -328,8 +330,7 @@ public class CdcMain implements Runnable {
}
}
finally {
- for (GridComponent comp : kctx)
- comp.stop(false);
+ closeAll(kctx);
if (log.isInfoEnabled())
log.info("Ignite Change Data Capture Application
stopped.");
@@ -370,8 +371,7 @@ public class CdcMain implements Runnable {
kctx.resource().setSpringContext(ctx);
- for (GridComponent comp : kctx)
- comp.start();
+ startAll(kctx);
mreg = kctx.metric().registry("cdc");
@@ -647,27 +647,10 @@ public class CdcMain implements Runnable {
if (files == null)
return;
- Iterator<TypeMapping> changedMappings = Arrays.stream(files)
- .map(f -> {
- String fileName = f.getName();
-
- int typeId = BinaryUtils.mappedTypeId(fileName);
- byte platformId =
BinaryUtils.mappedFilePlatformId(fileName);
-
- T2<Integer, Byte> state = new T2<>(typeId, platformId);
-
- if (mappingsState.contains(state))
- return null;
-
- mappingsState.add(state);
-
- return (TypeMapping)new TypeMappingImpl(
- typeId,
- BinaryUtils.readMapping(f),
- platformId == 0 ? PlatformType.JAVA :
PlatformType.DOTNET);
- })
- .filter(Objects::nonNull)
- .iterator();
+ Iterator<TypeMapping> changedMappings = typeMappingIterator(
+ files,
+ tm -> mappingsState.add(new T2<>(tm.typeId(),
(byte)tm.platformType().ordinal()))
+ );
if (!changedMappings.hasNext())
return;
@@ -867,4 +850,26 @@ public class CdcMain implements Runnable {
public static String cdcInstanceName(String igniteInstanceName) {
return "cdc-" + igniteInstanceName;
}
+
+ /**
+ * @param files Mapping files.
+ * @return Type mapping iterator.
+ */
+ public static Iterator<TypeMapping> typeMappingIterator(File[] files,
Predicate<TypeMapping> filter) {
+ return Arrays.stream(files)
+ .map(f -> {
+ String fileName = f.getName();
+
+ int typeId = BinaryUtils.mappedTypeId(fileName);
+ byte platformId = BinaryUtils.mappedFilePlatformId(fileName);
+
+ return (TypeMapping)new TypeMappingImpl(
+ typeId,
+ BinaryUtils.readMapping(f),
+ platformId == 0 ? PlatformType.JAVA : PlatformType.DOTNET);
+ })
+ .filter(filter)
+ .filter(Objects::nonNull)
+ .iterator();
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtils.java
new file mode 100644
index 00000000000..c53fc7fed98
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+
+/**
+ * Methods to reuse various CDC like utilities.
+ */
+public class CdcUtils {
+ /**
+ * Register {@code meta}.
+ *
+ * @param ctx Binary context.
+ * @param log Logger.
+ * @param meta Binary metadata to register.
+ */
+ public static void registerBinaryMeta(BinaryContext ctx, IgniteLogger log,
BinaryMetadata meta) {
+ ctx.updateMetadata(meta.typeId(), meta, false);
+
+ if (log.isInfoEnabled())
+ log.info("BinaryMeta [meta=" + meta + ']');
+ }
+
+ /**
+ * Register {@code mapping}.
+ *
+ * @param ctx Binary context.
+ * @param log Logger.
+ * @param mapping Type mapping to register.
+ */
+ public static void registerMapping(BinaryContext ctx, IgniteLogger log,
TypeMapping mapping) {
+ assert mapping.platformType().ordinal() <= Byte.MAX_VALUE;
+
+ byte platformType = (byte)mapping.platformType().ordinal();
+
+ ctx.registerUserClassName(mapping.typeId(), mapping.typeName(), false,
false, platformType);
+
+ if (log.isInfoEnabled())
+ log.info("Mapping [mapping=" + mapping + ']');
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
index 216e7332fbd..f69f579084f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
@@ -71,18 +71,7 @@ public class UnwrapDataEntry extends DataEntry implements
UnwrappedDataEntry {
/** {@inheritDoc} */
@Override public Object unwrappedKey() {
try {
- if (keepBinary && key instanceof BinaryObject)
- return key;
-
- Object unwrapped = key.value(cacheObjValCtx, false);
-
- if (unwrapped instanceof BinaryObject) {
- if (keepBinary)
- return unwrapped;
- unwrapped = ((BinaryObject)unwrapped).deserialize();
- }
-
- return unwrapped;
+ return unwrapKey(key, keepBinary, cacheObjValCtx);
}
catch (Exception e) {
cacheObjValCtx.kernalContext().log(UnwrapDataEntry.class)
@@ -95,13 +84,7 @@ public class UnwrapDataEntry extends DataEntry implements
UnwrappedDataEntry {
/** {@inheritDoc} */
@Override public Object unwrappedValue() {
try {
- if (val == null)
- return null;
-
- if (keepBinary && val instanceof BinaryObject)
- return val;
-
- return val.value(cacheObjValCtx, false);
+ return unwrapValue(val, keepBinary, cacheObjValCtx);
}
catch (Exception e) {
cacheObjValCtx.kernalContext().log(UnwrapDataEntry.class)
@@ -110,6 +93,33 @@ public class UnwrapDataEntry extends DataEntry implements
UnwrappedDataEntry {
}
}
+ /** */
+ public static Object unwrapKey(KeyCacheObject key, boolean keepBinary,
CacheObjectValueContext cacheObjValCtx) {
+ if (keepBinary && key instanceof BinaryObject)
+ return key;
+
+ Object unwrapped = key.value(cacheObjValCtx, false);
+
+ if (unwrapped instanceof BinaryObject) {
+ if (keepBinary)
+ return unwrapped;
+ unwrapped = ((BinaryObject)unwrapped).deserialize();
+ }
+
+ return unwrapped;
+ }
+
+ /** */
+ public static Object unwrapValue(CacheObject val, boolean keepBinary,
CacheObjectValueContext cacheObjValCtx) {
+ if (val == null)
+ return null;
+
+ if (keepBinary && val instanceof BinaryObject)
+ return val;
+
+ return val.value(cacheObjValCtx, false);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
SB sb = new SB();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 02be48a5620..c36917ee023 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -38,7 +38,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
@@ -46,14 +46,15 @@ import
org.apache.ignite.internal.managers.encryption.EncryptionCacheKeyProvider
import org.apache.ignite.internal.managers.encryption.GroupKey;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.DumpEntry;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
@@ -82,6 +83,8 @@ import static
org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
+import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
+import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
import static
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
import static
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
@@ -197,8 +200,7 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
EncryptionCacheKeyProvider snpEncrKeyProvider = new
SnapshotEncryptionKeyProvider(cctx.kernalContext(), grpDirs);
- for (GridComponent comp : snpCtx)
- comp.start();
+ startAll(snpCtx);
try {
U.doInParallel(
@@ -311,8 +313,7 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
throw t;
}
finally {
- for (GridComponent comp : snpCtx)
- comp.stop(true);
+ closeAll(snpCtx);
}
return res;
@@ -356,23 +357,12 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
private Map<PartitionKeyV2, PartitionHashRecordV2> checkDumpFiles(
SnapshotHandlerContext opCtx,
Set<File> partFiles
- ) throws IgniteCheckedException {
- GridKernalContext snpCtx =
cctx.snapshotMgr().createStandaloneKernalContext(
- cctx.kernalContext().compress(),
- opCtx.snapshotDirectory(),
- opCtx.metadata().folderName()
- );
-
- for (GridComponent comp : snpCtx)
- comp.start();
-
- try {
- Dump dump = new Dump(snpCtx, opCtx.snapshotDirectory());
-
+ ) {
+ try (Dump dump = new Dump(opCtx.snapshotDirectory(), true, true, log))
{
Collection<PartitionHashRecordV2> partitionHashRecordV2s =
U.doInParallel(
cctx.snapshotMgr().snapshotExecutorService(),
partFiles,
- part -> caclucateDumpedPartitionHash(dump,
cacheGroupName(part.getParentFile()), partId(part.getName()))
+ part -> calculateDumpedPartitionHash(dump,
cacheGroupName(part.getParentFile()), partId(part.getName()))
);
return
partitionHashRecordV2s.stream().collect(Collectors.toMap(PartitionHashRecordV2::partitionKey,
r -> r));
@@ -380,16 +370,12 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
catch (Throwable t) {
log.error("Error executing handler: ", t);
- throw t;
- }
- finally {
- for (GridComponent comp : snpCtx)
- comp.stop(true);
+ throw new IgniteException(t);
}
}
/** */
- private PartitionHashRecordV2 caclucateDumpedPartitionHash(Dump dump,
String grpName, int part) {
+ private PartitionHashRecordV2 calculateDumpedPartitionHash(Dump dump,
String grpName, int part) {
if (skipHash()) {
return new PartitionHashRecordV2(
new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
@@ -413,7 +399,7 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
while (iter.hasNext()) {
DumpEntry e = iter.next();
- ctx.update(e.key(), e.value(), null);
+ ctx.update((KeyCacheObject)e.key(),
(CacheObject)e.value(), null);
size++;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
index e1f98c68404..f3bc78e457a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
@@ -36,10 +36,12 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -217,6 +219,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
long start = System.currentTimeMillis();
AtomicLong entriesCnt = new AtomicLong();
+ AtomicLong writtenEntriesCnt = new AtomicLong();
AtomicLong changedEntriesCnt = new AtomicLong();
String name = cctx.cache().cacheGroup(grp).cacheOrGroupName();
@@ -228,6 +231,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
List<CompletableFuture<Void>> futs = grpParts.stream().map(part ->
runAsync(() -> {
long entriesCnt0 = 0;
+ long writtenEntriesCnt0 = 0;
try (PartitionDumpContext dumpCtx = dumpContext(grp, part)) {
try (GridCloseableIterator<CacheDataRow> rows =
gctx.offheap().reservedIterator(part, dumpCtx.topVer)) {
@@ -242,11 +246,14 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
int cache = row.cacheId() == 0 ? grp : row.cacheId();
if (dumpCtx.writeForIterator(cache, row.expireTime(),
row.key(), row.value(), row.version()))
- entriesCnt0++;
+ writtenEntriesCnt0++;
+
+ entriesCnt0++;
}
}
entriesCnt.addAndGet(entriesCnt0);
+ writtenEntriesCnt.addAndGet(writtenEntriesCnt0);
changedEntriesCnt.addAndGet(dumpCtx.changedCnt.intValue());
if (log.isDebugEnabled()) {
@@ -254,8 +261,9 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
", id=" + grp +
", part=" + part +
", time=" + (System.currentTimeMillis() - start) +
- ", iteratorEntriesCount=" + entriesCnt +
- ", changedEntriesCount=" + changedEntriesCnt + ']');
+ ", iterEntriesCnt=" + entriesCnt +
+ ", writtenIterEntriesCnt=" + entriesCnt +
+ ", changedEntriesCnt=" + changedEntriesCnt + ']');
}
}
@@ -270,8 +278,9 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
log.info("Finish group dump [name=" + name +
", id=" + grp +
", time=" + (System.currentTimeMillis() - start) +
- ", iteratorEntriesCount=" + entriesCnt +
- ", changedEntriesCount=" + changedEntriesCnt + ']');
+ ", iterEntriesCnt=" + entriesCnt.get() +
+ ", writtenIterEntriesCnt=" + writtenEntriesCnt.get() +
+ ", changedEntriesCnt=" + changedEntriesCnt.get() + ']');
}
});
@@ -281,9 +290,15 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
/** {@inheritDoc} */
@Override public void beforeChange(GridCacheContext cctx, KeyCacheObject
key, CacheObject val, long expireTime, GridCacheVersion ver) {
try {
- assert key.partition() != -1;
+ int part = key.partition();
+ int grp = cctx.groupId();
+
+ assert part != -1;
- dumpContext(cctx.groupId(),
key.partition()).writeChanged(cctx.cacheId(), expireTime, key, val, ver);
+ if (!parts.get(grp).contains(part))
+ return;
+
+ dumpContext(grp, part).writeChanged(cctx.cacheId(), expireTime,
key, val, ver);
}
catch (IgniteException e) {
acceptException(e);
@@ -369,6 +384,9 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
/** Last version on time of dump start. Can be used only for primary.
*/
@Nullable final GridCacheVersion startVer;
+ /** Last version on time of dump start. Can be used only for primary.
*/
+ final GridCacheVersion isolatedStreamerVer;
+
/** Topology Version. */
private final AffinityTopologyVersion topVer;
@@ -395,6 +413,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
topVer = gctx.topology().lastTopologyChangeVersion();
startVer = grpPrimaries.get(gctx.groupId()).contains(part) ?
gctx.shared().versions().last() : null;
+ isolatedStreamerVer =
cctx.versions().isolatedStreamerVersion();
serdes = new DumpEntrySerializer(thLocBufs);
changed = new HashMap<>();
@@ -422,13 +441,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
* @param val Value before change.
* @param ver Version before change.
*/
- public void writeChanged(
- int cache,
- long expireTime,
- KeyCacheObject key,
- CacheObject val,
- GridCacheVersion ver
- ) {
+ public void writeChanged(int cache, long expireTime, KeyCacheObject
key, CacheObject val, GridCacheVersion ver) {
String reasonToSkip = null;
if (closed) // Quick exit. Partition already saved in dump.
@@ -439,7 +452,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
try {
if (closed) // Partition already saved in dump.
reasonToSkip = "partition already saved";
- else if (startVer != null && ver.isGreater(startVer))
+ else if (isAfterStart(ver))
reasonToSkip = "greater version";
else if (!changed.get(cache).add(key)) // Entry changed
several time during dump.
reasonToSkip = "changed several times";
@@ -485,7 +498,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
) {
boolean written = true;
- if (startVer != null && ver.isGreater(startVer))
+ if (isAfterStart(ver))
written = false;
else if (changed.get(cache).contains(key))
written = false;
@@ -519,6 +532,18 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
}
}
+ /**
+ * Note, usage of {@link IgniteDataStreamer} may lead to dump
inconsistency.
+ * Because, streamer use the same {@link GridCacheVersion} for all
entries.
+ * So, we can't efficiently filter out new entries and write them all
to dump.
+ *
+ * @param ver Entry version.
+ * @return {@code True} if {@code ver} appeared after dump started.
+ */
+ private boolean isAfterStart(GridCacheVersion ver) {
+ return (startVer != null && ver.isGreater(startVer)) &&
!isolatedStreamerVer.equals(ver);
+ }
+
/** {@inheritDoc} */
@Override public void close() {
synchronized (this) { // Prevent concurrent close invocation.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
index b63b86ed63e..38baf38356b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -35,16 +36,24 @@ import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
+import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -60,17 +69,30 @@ import static
org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METAFILE_EXT;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
+import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
+import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
/**
* This class provides ability to work with saved cache dump.
*/
-public class Dump {
+public class Dump implements AutoCloseable {
+ /** Snapshot meta. */
+ private final List<SnapshotMetadata> metadata;
+
/** Dump directory. */
private final File dumpDir;
- /** Kernal context. */
+ /**
+ * Kernal context for each node in dump.
+ */
private final GridKernalContext cctx;
+ /** If {@code true} then return data in form of {@link BinaryObject}. */
+ private final boolean keepBinary;
+
+ /** If {@code true} then don't deserialize {@link KeyCacheObject} and
{@link CacheObject}. */
+ private final boolean raw;
+
/**
* Map shared across all instances of {@link DumpEntrySerializer}.
* We use per thread buffer because number of threads is fewer then number
of partitions.
@@ -80,36 +102,77 @@ public class Dump {
private final ConcurrentMap<Long, ByteBuffer> thLocBufs = new
ConcurrentHashMap<>();
/**
- * @param cctx Kernal context.
* @param dumpDir Dump directory.
+ * @param keepBinary If {@code true} then don't deserialize {@link
KeyCacheObject} and {@link CacheObject}.
*/
- public Dump(GridKernalContext cctx, File dumpDir) {
- this.cctx = cctx;
+ public Dump(File dumpDir, boolean keepBinary, boolean raw, IgniteLogger
log) {
+ A.ensure(dumpDir != null, "dump directory is null");
+ A.ensure(dumpDir.exists(), "dump directory not exists");
+
this.dumpDir = dumpDir;
+ this.metadata = metadata(dumpDir);
+ this.keepBinary = keepBinary;
+ this.cctx = standaloneKernalContext(dumpDir, log);
+ this.raw = raw;
+ }
- File binaryMeta = new File(dumpDir, DFLT_BINARY_METADATA_PATH);
+ /**
+ * @param dumpDir Dump directory.
+ * @param log Logger.
+ * @return Standalone kernal context.
+ */
+ private GridKernalContext standaloneKernalContext(File dumpDir,
IgniteLogger log) {
+ File binaryMeta =
CacheObjectBinaryProcessorImpl.binaryWorkDir(dumpDir.getAbsolutePath(),
F.first(metadata).folderName());
File marshaller = new File(dumpDir, DFLT_MARSHALLER_PATH);
- A.ensure(dumpDir != null, "dump directory is null");
- A.ensure(dumpDir.exists(), "dump directory not exists");
A.ensure(binaryMeta.exists(), "binary metadata directory not exists");
A.ensure(marshaller.exists(), "marshaller directory not exists");
+
+ try {
+ GridKernalContext kctx = new StandaloneGridKernalContext(log,
binaryMeta, marshaller);
+
+ startAll(kctx);
+
+ return kctx;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** @return Binary types iterator. */
+ public Iterator<BinaryType> types() {
+ return cctx.cacheObjects().metadata().iterator();
}
/** @return List of node directories. */
public List<String> nodesDirectories() {
- return Arrays.stream(new File(dumpDir, DFLT_STORE_DIR).listFiles(f ->
f.isDirectory() &&
- !(f.getAbsolutePath().endsWith(DFLT_BINARY_METADATA_PATH)
- ||
f.getAbsolutePath().endsWith(DFLT_MARSHALLER_PATH)))).map(File::getName).collect(Collectors.toList());
+ File[] dirs = new File(dumpDir, DFLT_STORE_DIR).listFiles(f ->
f.isDirectory()
+ && !(f.getAbsolutePath().endsWith(DFLT_BINARY_METADATA_PATH) ||
f.getAbsolutePath().endsWith(DFLT_MARSHALLER_PATH)));
+
+ if (dirs == null)
+ return Collections.emptyList();
+
+ return
Arrays.stream(dirs).map(File::getName).collect(Collectors.toList());
}
/** @return List of snapshot metadata saved in {@link #dumpDir}. */
public List<SnapshotMetadata> metadata() throws IOException,
IgniteCheckedException {
- JdkMarshaller marsh =
MarshallerUtils.jdkMarshaller(cctx.igniteInstanceName());
+ return metadata;
+ }
- ClassLoader clsLdr = U.resolveClassLoader(cctx.config());
+ /** @return List of snapshot metadata saved in {@link #dumpDir}. */
+ private static List<SnapshotMetadata> metadata(File dumpDir) {
+ JdkMarshaller marsh = MarshallerUtils.jdkMarshaller("fake-node");
+
+ ClassLoader clsLdr = U.resolveClassLoader(new IgniteConfiguration());
+
+ File[] files = dumpDir.listFiles(f ->
f.getName().endsWith(SNAPSHOT_METAFILE_EXT));
- return Arrays.stream(dumpDir.listFiles(f ->
f.getName().endsWith(SNAPSHOT_METAFILE_EXT))).map(meta -> {
+ if (files == null)
+ return Collections.emptyList();
+
+ return Arrays.stream(files).map(meta -> {
try (InputStream in = new
BufferedInputStream(Files.newInputStream(meta.toPath()))) {
return marsh.<SnapshotMetadata>unmarshal(in, clsLdr);
}
@@ -143,8 +206,13 @@ public class Dump {
* @return Dump iterator.
*/
public List<Integer> partitions(String node, int group) {
- return Arrays.stream(dumpGroupDirectory(node, group)
- .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) &&
f.getName().endsWith(DUMP_FILE_EXT)))
+ File[] parts = dumpGroupDirectory(node, group)
+ .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) &&
f.getName().endsWith(DUMP_FILE_EXT));
+
+ if (parts == null)
+ return Collections.emptyList();
+
+ return Arrays.stream(parts)
.map(partFile ->
Integer.parseInt(partFile.getName().replace(PART_FILE_PREFIX,
"").replace(DUMP_FILE_EXT, "")))
.collect(Collectors.toList());
}
@@ -155,16 +223,6 @@ public class Dump {
* @return Dump iterator.
*/
public DumpedPartitionIterator iterator(String node, int group, int part) {
- return iterator(node, group, part, true);
- }
-
- /**
- * @param node Node directory name.
- * @param group Group id.
- * @param excludeDuplicates Skip entries that was dumped twice - by
iterator and change listener.
- * @return Dump iterator.
- */
- DumpedPartitionIterator iterator(String node, int group, int part, boolean
excludeDuplicates) {
FileIOFactory ioFactory = new RandomAccessFileIOFactory();
FileIO dumpFile;
@@ -179,11 +237,13 @@ public class Dump {
DumpEntrySerializer serializer = new DumpEntrySerializer(thLocBufs);
serializer.kernalContext(cctx);
+ serializer.keepBinary(keepBinary);
+ serializer.raw(raw);
return new DumpedPartitionIterator() {
DumpEntry next;
- Set<KeyCacheObject> partKeys = new HashSet<>();
+ Set<Object> partKeys = new HashSet<>();
/** {@inheritDoc} */
@Override public boolean hasNext() {
@@ -268,6 +328,11 @@ public class Dump {
return grpDirs[0];
}
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ closeAll(cctx);
+ }
+
/**
* Closeable dump iterator.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
index 1e8b14d9886..52b33e65be4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
@@ -22,7 +22,10 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -49,6 +52,12 @@ public class DumpEntrySerializer {
/** Fake context. */
private CacheObjectContext fakeCacheObjCtx;
+ /** If {@code true} then don't deserialize {@link KeyCacheObject} and
{@link CacheObject}. */
+ private boolean raw = false;
+
+ /** If {@code true} then return data in form of {@link BinaryObject}. */
+ private boolean keepBinary;
+
/**
* @param thLocBufs Thread local buffers.
*/
@@ -62,6 +71,16 @@ public class DumpEntrySerializer {
fakeCacheObjCtx = new CacheObjectContext(cctx, null, null, false,
false, false, false, false);
}
+ /** @param keepBinary If {@code true} then return data in form of {@link
BinaryObject}. */
+ public void keepBinary(boolean keepBinary) {
+ this.keepBinary = keepBinary;
+ }
+
+ /** @param raw If {@code true} then don't deserialize {@link
KeyCacheObject} and {@link CacheObject}. */
+ public void raw(boolean raw) {
+ this.raw = raw;
+ }
+
/**
* Dump entry structure:
* <pre>
@@ -202,12 +221,12 @@ public class DumpEntrySerializer {
return expireTime;
}
- @Override public KeyCacheObject key() {
- return key;
+ @Override public Object key() {
+ return raw ? key : UnwrapDataEntry.unwrapKey(key, keepBinary,
fakeCacheObjCtx);
}
- @Override public CacheObject value() {
- return val;
+ @Override public Object value() {
+ return raw ? val : UnwrapDataEntry.unwrapValue(val,
keepBinary, fakeCacheObjCtx);
}
};
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 05c40db53bd..2535af7fadf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -34,7 +34,6 @@ import java.util.TreeSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
@@ -57,6 +56,8 @@ import static java.lang.System.arraycopy;
import static java.nio.file.Files.walkFileTree;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN;
+import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
+import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition;
@@ -176,8 +177,7 @@ public class IgniteWalIteratorFactory {
if (iteratorParametersBuilder.sharedCtx == null) {
GridCacheSharedContext<?, ?> sctx =
prepareSharedCtx(iteratorParametersBuilder);
- for (GridComponent comp : sctx.kernalContext())
- comp.start();
+ startAll(sctx.kernalContext());
return new StandaloneWalRecordsIterator(
iteratorParametersBuilder.log == null ? log :
iteratorParametersBuilder.log,
@@ -194,8 +194,7 @@ public class IgniteWalIteratorFactory {
@Override protected void onClose() throws
IgniteCheckedException {
super.onClose();
- for (GridComponent comp : sctx.kernalContext())
- comp.stop(true);
+ closeAll(sctx.kernalContext());
}
};
}
@@ -690,9 +689,9 @@ public class IgniteWalIteratorFactory {
/**
*
*/
- private static class ConsoleLogger implements IgniteLogger {
+ public static class ConsoleLogger implements IgniteLogger {
/** */
- private static final ConsoleLogger INSTANCE = new ConsoleLogger();
+ public static final ConsoleLogger INSTANCE = new ConsoleLogger();
/** */
private static final PrintStream OUT = System.out;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 8c1f52a4c15..a166840f2fb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -694,4 +694,22 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
@Override public Executor getAsyncContinuationExecutor() {
return null;
}
+
+ /**
+ * @param kctx Kernal context.
+ * @throws IgniteCheckedException In case of any error.
+ */
+ public static void startAll(GridKernalContext kctx) throws
IgniteCheckedException {
+ for (GridComponent comp : kctx)
+ comp.start();
+ }
+
+ /**
+ * @param kctx Kernal context.
+ * @throws IgniteCheckedException In case of any error.
+ */
+ public static void closeAll(GridKernalContext kctx) throws
IgniteCheckedException {
+ for (GridComponent comp : kctx)
+ comp.stop(true);
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
index 5e7164c52fe..d4524b5eb1b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
@@ -19,8 +19,8 @@ package
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
import java.io.File;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -31,24 +31,30 @@ import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpConsumer;
+import org.apache.ignite.dump.DumpEntry;
+import org.apache.ignite.dump.DumpReader;
+import org.apache.ignite.dump.DumpReaderConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.DumpedPartitionIterator;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -63,8 +69,11 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.dump.DumpReaderConfiguration.DFLT_THREAD_CNT;
+import static org.apache.ignite.dump.DumpReaderConfiguration.DFLT_TIMEOUT;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_RUNNING_DIR_KEY;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.toLong;
import static org.apache.ignite.platform.model.AccessLevel.SUPER;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -88,7 +97,7 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
public static final String DMP_NAME = "dump";
/** */
- static final IntFunction<User> USER_FACTORY = i ->
+ public static final IntFunction<User> USER_FACTORY = i ->
new User(i, ACL.values()[Math.abs(i) % ACL.values().length], new
Role("Role" + i, SUPER));
/** */
@@ -108,7 +117,15 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
public CacheAtomicityMode mode;
/** */
- @Parameterized.Parameters(name =
"nodes={0},backups={1},persistence={2},mode={3}")
+ @Parameterized.Parameter(4)
+ public boolean useDataStreamer;
+
+ /** */
+ @Parameterized.Parameter(5)
+ public boolean onlyPrimary;
+
+ /** */
+ @Parameterized.Parameters(name =
"nodes={0},backups={1},persistence={2},mode={3},useDataStreamer={4},onlyPrimary={5}")
public static List<Object[]> params() {
List<Object[]> params = new ArrayList<>();
@@ -116,10 +133,17 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
for (int backups : new int[]{0, 1})
for (boolean persistence : new boolean[]{true, false})
for (CacheAtomicityMode mode :
CacheAtomicityMode._values()) {
- if (nodes == 1 && backups != 0)
- continue;
-
- params.add(new Object[]{nodes, backups, persistence,
mode});
+ for (boolean useDataStreamer : new boolean[]{true,
false}) {
+ if (nodes == 1 && backups != 0)
+ continue;
+
+ if (backups > 0) {
+ params.add(new Object[]{nodes, backups,
persistence, mode, useDataStreamer, false});
+ params.add(new Object[]{nodes, backups,
persistence, mode, useDataStreamer, true});
+ }
+ else
+ params.add(new Object[]{nodes, backups,
persistence, mode, useDataStreamer, false});
+ }
}
return params;
@@ -183,7 +207,7 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
ign.cluster().state(ClusterState.ACTIVE);
- putData(ign.cache(DEFAULT_CACHE_NAME), ign.cache(CACHE_0),
ign.cache(CACHE_1));
+ putData(cli.cache(DEFAULT_CACHE_NAME), cli.cache(CACHE_0),
cli.cache(CACHE_1));
return ign;
}
@@ -205,7 +229,7 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
});
}
- IgniteInternalFuture<Object> dumpFut = runAsync(() ->
createDump((IgniteEx)ignites.get(0)));
+ IgniteInternalFuture<Object> dumpFut = runAsync(() ->
createDump((IgniteEx)F.first(ignites)));
// Waiting while dump will be setup: task planned after change
listener set.
assertTrue(waitForCondition(() -> {
@@ -229,15 +253,30 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
IgniteCache<Object, Object> grpCache0,
IgniteCache<Object, Object> grpCache1
) {
- IntStream.range(0, KEYS_CNT).forEach(i -> {
- cache.put(i, i);
- grpCache0.put(i, USER_FACTORY.apply(i));
- grpCache1.put(new Key(i), new Value(String.valueOf(i)));
- });
+ if (useDataStreamer) {
+ try (
+ IgniteDataStreamer<Integer, Integer> _cache =
cli.dataStreamer(cache.getName());
+ IgniteDataStreamer<Integer, User> _grpCache0 =
cli.dataStreamer(grpCache0.getName());
+ IgniteDataStreamer<Key, Value> _grpCache1 =
cli.dataStreamer(grpCache1.getName())
+ ) {
+ IntStream.range(0, KEYS_CNT).forEach(i -> {
+ _cache.addData(i, i);
+ _grpCache0.addData(i, USER_FACTORY.apply(i));
+ _grpCache1.addData(new Key(i), new
Value(String.valueOf(i)));
+ });
+ }
+ }
+ else {
+ IntStream.range(0, KEYS_CNT).forEach(i -> {
+ cache.put(i, i);
+ grpCache0.put(i, USER_FACTORY.apply(i));
+ grpCache1.put(new Key(i), new Value(String.valueOf(i)));
+ });
+ }
}
/** */
- void checkDump(IgniteEx ign) throws Exception {
+ protected void checkDump(IgniteEx ign) throws Exception {
checkDump(ign, DMP_NAME);
}
@@ -265,101 +304,117 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
assertEquals(nodes, nodesDirs.size());
- Set<Integer> keys = new HashSet<>();
- int dfltDumpSz = 0;
- int grpDumpSz = 0;
-
- CacheObjectContext coCtx =
ign.context().cache().context().cacheObjectContext(CU.cacheId(DEFAULT_CACHE_NAME));
- CacheObjectContext coCtx0 =
ign.context().cache().context().cacheObjectContext(CU.cacheId(CACHE_0));
- CacheObjectContext coCtx1 =
ign.context().cache().context().cacheObjectContext(CU.cacheId(CACHE_1));
-
- for (String nodeDir : nodesDirs) {
- List<StoredCacheData> ccfgs = dump.configs(nodeDir,
CU.cacheId(DEFAULT_CACHE_NAME));
+ TestDumpConsumer cnsmr = new TestDumpConsumer() {
+ final Set<Integer> keys = new HashSet<>();
- assertNotNull(ccfgs);
- assertEquals(1, ccfgs.size());
+ final Set<Long> grpParts = new HashSet<>();
- assertEquals(DEFAULT_CACHE_NAME,
ccfgs.get(0).configuration().getName());
- assertFalse(ccfgs.get(0).sql());
- assertTrue(ccfgs.get(0).queryEntities().isEmpty());
+ int dfltDumpSz;
- ccfgs = dump.configs(nodeDir, CU.cacheId(GRP));
+ int grpDumpSz;
- assertNotNull(ccfgs);
- assertEquals(2, ccfgs.size());
+ @Override public void onCacheConfigs(Iterator<StoredCacheData>
caches) {
+ super.onCacheConfigs(caches);
- ccfgs.sort(Comparator.comparing(d -> d.config().getName()));
+ boolean[] cachesFound = new boolean[3];
- CacheConfiguration ccfg0 = ccfgs.get(0).configuration();
- CacheConfiguration ccfg1 = ccfgs.get(1).configuration();
+ caches.forEachRemaining(data -> {
+ if (data.config().getName().equals(DEFAULT_CACHE_NAME)) {
+ assertFalse(cachesFound[0]);
+ cachesFound[0] = true;
- assertEquals(GRP, ccfg0.getGroupName());
- assertEquals(CACHE_0, ccfg0.getName());
-
- assertEquals(GRP, ccfg1.getGroupName());
- assertEquals(CACHE_1, ccfg1.getName());
+ assertEquals(DEFAULT_CACHE_NAME,
data.config().getName());
+ assertFalse(data.sql());
+ assertTrue(data.queryEntities().isEmpty());
+ }
+ else if (data.config().getName().equals(CACHE_0)) {
+ assertFalse(cachesFound[1]);
+ cachesFound[1] = true;
+
+ assertEquals(GRP, data.configuration().getGroupName());
+ assertEquals(CACHE_0, data.configuration().getName());
+ assertFalse(data.sql());
+ assertTrue(data.queryEntities().isEmpty());
+ }
+ else if (data.config().getName().equals(CACHE_1)) {
+ assertFalse(cachesFound[2]);
+ cachesFound[2] = true;
+
+ assertEquals(GRP, data.configuration().getGroupName());
+ assertEquals(CACHE_1, data.configuration().getName());
+ assertFalse(data.sql());
+ assertTrue(data.queryEntities().isEmpty());
+ }
+ else
+ throw new IgniteException("Unknown cache");
+ });
- assertFalse(ccfgs.get(0).sql());
- assertFalse(ccfgs.get(1).sql());
- assertTrue(ccfgs.get(0).queryEntities().isEmpty());
- assertTrue(ccfgs.get(1).queryEntities().isEmpty());
+ for (boolean found : cachesFound)
+ assertTrue(found);
+ }
- List<Integer> parts = dump.partitions(nodeDir,
CU.cacheId(DEFAULT_CACHE_NAME));
+ @Override public void onPartition(int grp, int part,
Iterator<DumpEntry> iter) {
+ if (onlyPrimary)
+ assertTrue(grpParts.add(toLong(grp, part)));
- for (int part : parts) {
- try (DumpedPartitionIterator iter = dump.iterator(nodeDir,
CU.cacheId(DEFAULT_CACHE_NAME), part)) {
+ if (grp == CU.cacheId(DEFAULT_CACHE_NAME)) {
while (iter.hasNext()) {
DumpEntry e = iter.next();
- checkDefaultCacheEntry(e, coCtx);
+ checkDefaultCacheEntry(e);
- keys.add(e.key().<Integer>value(coCtx, true));
+ keys.add((Integer)e.key());
dfltDumpSz++;
}
}
- }
-
- parts = dump.partitions(nodeDir, CU.cacheId(GRP));
-
- for (int part : parts) {
- try (DumpedPartitionIterator iter = dump.iterator(nodeDir,
CU.cacheId(GRP), part)) {
+ else {
while (iter.hasNext()) {
DumpEntry e = iter.next();
- checkGroupEntry(e, coCtx0, coCtx1);
+ assertNotNull(e);
+
+ if (e.cacheId() == CU.cacheId(CACHE_0))
+ assertEquals(USER_FACTORY.apply((Integer)e.key()),
e.value());
+ else
+ assertEquals(((Key)e.key()).getId() + "",
((Value)e.value()).getVal());
grpDumpSz++;
}
}
}
- }
- assertEquals(KEYS_CNT + KEYS_CNT * backups, dfltDumpSz);
- assertEquals(2 * (KEYS_CNT + KEYS_CNT * backups), grpDumpSz);
+ @Override public void check() {
+ super.check();
- IntStream.range(0, KEYS_CNT).forEach(key ->
assertTrue(keys.contains(key)));
- }
-
- /** */
- protected void checkDefaultCacheEntry(DumpEntry e, CacheObjectContext
coCtx) {
- assertNotNull(e);
+ assertEquals(KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT *
backups), dfltDumpSz);
+ assertEquals(2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT *
backups)), grpDumpSz);
- Integer key = e.key().<Integer>value(coCtx, true);
+ IntStream.range(0, KEYS_CNT).forEach(key ->
assertTrue(keys.contains(key)));
+ }
+ };
- assertEquals(key, e.value().<Integer>value(coCtx, true));
+ new DumpReader(
+ new DumpReaderConfiguration(
+ dumpDirectory(ign, name),
+ cnsmr,
+ DFLT_THREAD_CNT, DFLT_TIMEOUT,
+ true,
+ false
+ ),
+ log
+ ).run();
+
+ cnsmr.check();
}
/** */
- protected void checkGroupEntry(DumpEntry e, CacheObjectContext coCtx0,
CacheObjectContext coCtx1) {
+ protected void checkDefaultCacheEntry(DumpEntry e) {
assertNotNull(e);
- if (e.cacheId() == CU.cacheId(CACHE_0))
- assertEquals(USER_FACTORY.apply(e.key().value(coCtx0, true)),
e.value().value(coCtx0, true));
- else {
- assertNotNull(e.key().<Key>value(coCtx1, true));
- assertNotNull(e.value().<Value>value(coCtx1, true));
- }
+ Integer key = (Integer)e.key();
+
+ assertEquals(key, e.value());
}
/** */
@@ -419,18 +474,25 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
}
/** */
- void createDump(IgniteEx ign) {
+ protected void createDump(IgniteEx ign) {
createDump(ign, DMP_NAME);
}
/** */
public static Dump dump(IgniteEx ign, String name) throws
IgniteCheckedException {
return new Dump(
- ign.context(),
- new File(U.resolveWorkDirectory(U.defaultWorkDirectory(),
ign.configuration().getSnapshotPath(), false), name)
+ dumpDirectory(ign, name),
+ true,
+ false,
+ log
);
}
+ /** */
+ public static File dumpDirectory(IgniteEx ign, String name) throws
IgniteCheckedException {
+ return new File(U.resolveWorkDirectory(U.defaultWorkDirectory(),
ign.configuration().getSnapshotPath(), false), name);
+ }
+
/** */
public static void checkDumpWithCommand(IgniteEx ign, String name, int
backups) throws Exception {
CacheGroupContext gctx =
ign.context().cache().cacheGroup(CU.cacheId(DEFAULT_CACHE_NAME));
@@ -458,6 +520,88 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
/** */
void createDump(IgniteEx ign, String name) {
- ign.snapshot().createDump(name).get();
+ ign.context().cache().context().snapshotMgr().createSnapshot(name,
null, false, true, true).get();
+ }
+
+ /** */
+ public abstract static class TestDumpConsumer implements DumpConsumer {
+ /** */
+ private boolean started;
+
+ /** */
+ private boolean stopped;
+
+ /** */
+ private boolean typesCb;
+
+ /** */
+ private boolean mappingcCb;
+
+ /** */
+ private boolean cacheCfgCb;
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ assertFalse(started);
+ assertFalse(mappingcCb);
+ assertFalse(typesCb);
+ assertFalse(cacheCfgCb);
+ assertFalse(stopped);
+
+ started = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMappings(Iterator<TypeMapping> mappings) {
+ assertTrue(started);
+ assertFalse(mappingcCb);
+ assertFalse(typesCb);
+ assertFalse(cacheCfgCb);
+ assertFalse(stopped);
+
+ mappingcCb = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTypes(Iterator<BinaryType> types) {
+ assertTrue(started);
+ assertTrue(mappingcCb);
+ assertFalse(typesCb);
+ assertFalse(cacheCfgCb);
+ assertFalse(stopped);
+
+ typesCb = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheConfigs(Iterator<StoredCacheData> caches)
{
+ assertTrue(started);
+ assertTrue(mappingcCb);
+ assertTrue(typesCb);
+ assertFalse(cacheCfgCb);
+ assertFalse(stopped);
+
+ cacheCfgCb = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ assertTrue(started);
+ assertTrue(typesCb);
+ assertTrue(mappingcCb);
+ assertTrue(cacheCfgCb);
+ assertFalse(stopped);
+
+ stopped = true;
+ }
+
+ /** */
+ public void check() {
+ assertTrue(started);
+ assertTrue(typesCb);
+ assertTrue(mappingcCb);
+ assertTrue(cacheCfgCb);
+ assertTrue(stopped);
+ }
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
index 893d94f77d9..9491a3f6534 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
@@ -36,9 +36,9 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
@@ -55,6 +55,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_RUNNING_DIR_KEY;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.junit.Assume.assumeFalse;
/** */
public class IgniteCacheDumpSelfTest extends AbstractCacheDumpTest {
@@ -262,6 +263,8 @@ public class IgniteCacheDumpSelfTest extends
AbstractCacheDumpTest {
/** */
@Test
public void testDumpWithImplicitExpireTime() throws Exception {
+ assumeFalse(useDataStreamer);
+
explicitTtl = false;
doTestDumpWithExpiry();
@@ -452,8 +455,8 @@ public class IgniteCacheDumpSelfTest extends
AbstractCacheDumpTest {
}
/** {@inheritDoc} */
- @Override protected void checkDefaultCacheEntry(DumpEntry e,
CacheObjectContext coCtx) {
- super.checkDefaultCacheEntry(e, coCtx);
+ @Override protected void checkDefaultCacheEntry(DumpEntry e) {
+ super.checkDefaultCacheEntry(e);
if (explicitTtl != null) {
assertTrue("Expire time must be set", e.expireTime() != 0);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteConcurrentCacheDumpTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteConcurrentCacheDumpTest.java
index f087e5ccf77..a08f92dae72 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteConcurrentCacheDumpTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteConcurrentCacheDumpTest.java
@@ -36,7 +36,7 @@ import org.junit.runners.Parameterized;
/** */
public class IgniteConcurrentCacheDumpTest extends AbstractCacheDumpTest {
/** */
- @Parameterized.Parameters(name =
"nodes={0},backups={1},persistence={2},mode={3}")
+ @Parameterized.Parameters(name =
"nodes={0},backups={1},persistence={2},mode={3},useDataStreamer={4},onlyPrimary={5}")
public static List<Object[]> params() {
List<Object[]> params = new ArrayList<>();
@@ -44,7 +44,7 @@ public class IgniteConcurrentCacheDumpTest extends
AbstractCacheDumpTest {
for (int backups : new int[]{1, 2})
for (boolean persistence : new boolean[]{true, false})
for (CacheAtomicityMode mode :
CacheAtomicityMode._values())
- params.add(new Object[]{nodes, backups, persistence,
mode});
+ params.add(new Object[]{nodes, backups, persistence,
mode, false, false});
return params;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index 1865af069b9..20d6a4bfbaf 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -40,6 +40,8 @@ import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
import org.apache.ignite.testframework.GridTestUtils;
+import static
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
+
/**
* Test context.
*/
@@ -94,8 +96,7 @@ public class GridTestKernalContext extends
GridKernalContextImpl {
* @throws IgniteCheckedException If failed
*/
public void start() throws IgniteCheckedException {
- for (GridComponent comp : this)
- comp.start();
+ startAll(this);
}
/**
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/dump/DumpCacheConfigTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/dump/DumpCacheConfigTest.java
index e111223dc7e..18a24fbd63b 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/dump/DumpCacheConfigTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/dump/DumpCacheConfigTest.java
@@ -17,24 +17,27 @@
package org.apache.ignite.internal.dump;
+import java.io.File;
import java.util.Collection;
-import java.util.List;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.dump.DumpEntry;
+import org.apache.ignite.dump.DumpReader;
+import org.apache.ignite.dump.DumpReaderConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.DumpedPartitionIterator;
-import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.DumpEntry;
-import org.apache.ignite.internal.util.typedef.internal.CU;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.TestDumpConsumer;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.internal.cdc.SqlCdcTest.executeSql;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.DMP_NAME;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.KEYS_CNT;
-import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.dump;
/** */
public class DumpCacheConfigTest extends GridCommonAbstractTest {
@@ -69,57 +72,78 @@ public class DumpCacheConfigTest extends
GridCommonAbstractTest {
private void checkDump(IgniteEx srv, String name, boolean first) throws
Exception {
srv.snapshot().createDump(name).get(10_000L);
- int grpId = CU.cacheId("T1");
- int cnt = 0;
+ AtomicInteger cnt = new AtomicInteger();
- Dump dump = dump(srv, name);
+ TestDumpConsumer cnsmr = new TestDumpConsumer() {
+ @Override public void onTypes(Iterator<BinaryType> types) {
+ super.onTypes(types);
- for (String nodeDir : dump.nodesDirectories()) {
- for (int part : dump.partitions(nodeDir, grpId)) {
- try (DumpedPartitionIterator iter = dump.iterator(nodeDir,
grpId, part)) {
- while (iter.hasNext()) {
- DumpEntry e = iter.next();
+ assertTrue(types.hasNext());
- assertNotNull(e);
+ BinaryType type = types.next();
- int id = e.key().value(null, false);
+ assertFalse(types.hasNext());
- BinaryObject val = (BinaryObject)e.value();
+ assertTrue(type.typeName().startsWith("SQL_PUBLIC_T1"));
+ }
- assertNotNull(val);
- assertEquals("Name-" + id, val.field("NAME"));
+ @Override public void onCacheConfigs(Iterator<StoredCacheData>
caches) {
+ super.onCacheConfigs(caches);
- cnt++;
- }
- }
- }
+ assertTrue(caches.hasNext());
+
+ StoredCacheData data = caches.next();
+
+ assertFalse(caches.hasNext());
+
+ assertTrue(data.sql());
+
+ CacheConfiguration ccfg = data.config();
- List<StoredCacheData> ccfgs = dump.configs(nodeDir, grpId);
+ assertEquals("T1", ccfg.getName());
- assertNotNull(ccfgs);
- assertEquals(1, ccfgs.size());
- assertTrue(ccfgs.get(0).sql());
+ Collection<QueryEntity> qes = data.queryEntities();
- CacheConfiguration ccfg = ccfgs.get(0).config();
+ assertNotNull(qes);
+ assertEquals(1, qes.size());
- assertEquals("T1", ccfg.getName());
+ QueryEntity qe = qes.iterator().next();
- Collection<QueryEntity> qes = ccfgs.get(0).queryEntities();
+ assertNotNull(qe);
+ assertEquals("T1", qe.getTableName());
+ assertEquals(first ? 2 : 3, qe.getFields().size());
+ assertTrue(qe.getFields().containsKey("ID"));
+ assertTrue(qe.getFields().containsKey("NAME"));
+ if (!first)
+ assertTrue(qe.getFields().containsKey("ADDRESS"));
+ }
+
+ @Override public void onPartition(int grp, int part,
Iterator<DumpEntry> data) {
+ while (data.hasNext()) {
+ DumpEntry e = data.next();
+
+ assertNotNull(e);
+
+ BinaryObject val = (BinaryObject)e.value();
- assertNotNull(qes);
- assertEquals(1, qes.size());
+ assertNotNull(val);
+ assertEquals("Name-" + e.key(), val.field("NAME"));
+
+ cnt.incrementAndGet();
+ }
+ }
+ };
- QueryEntity qe = qes.iterator().next();
+ new DumpReader(
+ new DumpReaderConfiguration(
+ new File(U.resolveWorkDirectory(U.defaultWorkDirectory(),
srv.configuration().getSnapshotPath(), false), name),
+ cnsmr
+ ),
+ log
+ ).run();
- assertNotNull(qe);
- assertEquals("T1", qe.getTableName());
- assertEquals(first ? 2 : 3, qe.getFields().size());
- assertTrue(qe.getFields().containsKey("ID"));
- assertTrue(qe.getFields().containsKey("NAME"));
- if (!first)
- assertTrue(qe.getFields().containsKey("ADDRESS"));
- }
+ assertEquals(first ? KEYS_CNT : (KEYS_CNT * 2), cnt.get());
- assertEquals(first ? KEYS_CNT : (KEYS_CNT * 2), cnt);
+ cnsmr.check();
}
}