This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch cache_dumps
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/cache_dumps by this push:
     new 94628d583ab Code review changes
94628d583ab is described below

commit 94628d583abb28102133fd078f4163f31ba83172
Author: nizhikov <nizhi...@apache.org>
AuthorDate: Mon Oct 9 08:12:58 2023 +0300

    Code review changes
---
 .../org/apache/ignite/internal/cdc/CdcMain.java    |  9 +++--
 .../snapshot/AbstractCreateSnapshotFutureTask.java |  4 +-
 .../snapshot/SnapshotPartitionsVerifyHandler.java  |  8 ++--
 .../snapshot/dump/CreateDumpFutureTask.java        | 43 +++++++++++++++++-----
 .../cache/persistence/snapshot/dump/Dump.java      | 12 +++---
 .../wal/reader/IgniteWalIteratorFactory.java       |  8 ++--
 .../wal/reader/StandaloneGridKernalContext.java    |  4 +-
 .../junits/GridTestKernalContext.java              |  4 +-
 8 files changed, 56 insertions(+), 36 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index da0e791a905..2e5914d58d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -82,8 +82,8 @@ import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex;
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents;
 import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 
 /**
@@ -330,7 +330,7 @@ public class CdcMain implements Runnable {
                 }
             }
             finally {
-                closeAll(kctx);
+                closeAllComponents(kctx);
 
                 if (log.isInfoEnabled())
                     log.info("Ignite Change Data Capture Application 
stopped.");
@@ -371,7 +371,7 @@ public class CdcMain implements Runnable {
 
         kctx.resource().setSpringContext(ctx);
 
-        startAll(kctx);
+        startAllComponents(kctx);
 
         mreg = kctx.metric().registry("cdc");
 
@@ -853,6 +853,7 @@ public class CdcMain implements Runnable {
 
     /**
      * @param files Mapping files.
+     * @param filter Filter.
      * @return Type mapping iterator.
      */
     public static Iterator<TypeMapping> typeMappingIterator(File[] files, 
Predicate<TypeMapping> filter) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
index 89ae911cf5d..a2de0d59e64 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
@@ -44,9 +44,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static 
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
 
-/**
- *
- */
+/** */
 public abstract class AbstractCreateSnapshotFutureTask extends 
AbstractSnapshotFutureTask<SnapshotFutureTaskResult> {
     /**
      * Cache group and corresponding partitions collected under the PME lock.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 3d14d937fc9..3ea6d4c1264 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -83,8 +83,8 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents;
 import static 
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
 import static 
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
 
@@ -200,7 +200,7 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
 
         EncryptionCacheKeyProvider snpEncrKeyProvider = new 
SnapshotEncryptionKeyProvider(cctx.kernalContext(), grpDirs);
 
-        startAll(snpCtx);
+        startAllComponents(snpCtx);
 
         try {
             U.doInParallel(
@@ -313,7 +313,7 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
             throw t;
         }
         finally {
-            closeAll(snpCtx);
+            closeAllComponents(snpCtx);
         }
 
         return res;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
index de1009059b0..237665c2ab0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.LockSupport;
 import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
@@ -59,6 +60,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSn
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFutureTaskResult;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
@@ -372,7 +374,10 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
         /** Partition id. */
         final int part;
 
-        /** Hashes of cache keys of entries changed by the user during 
partition dump. */
+        /**
+         * Key is cache id, values is set of keys dumped via
+         * {@link #writeChanged(int, long, KeyCacheObject, CacheObject, 
GridCacheVersion)}.
+         */
         final Map<Integer, Set<KeyCacheObject>> changed;
 
         /** Count of entries changed during dump creation. */
@@ -381,23 +386,41 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
         /** Partition dump file. Lazily initialized to prevent creation files 
for empty partitions. */
         final FileIO file;
 
-        /** Last version on time of dump start. Can be used only for primary. 
*/
+        /**
+         * Regular updates with {@link IgniteCache#put(Object, Object)} and 
similar calls
+         * will use version generated with {@link 
GridCacheVersionManager#next(GridCacheVersion)}.
+         * Version is monotonically increase.
+         * Version generated on <b>primary</b> node and propagated to backups.
+         * So on primary we can distinguish updates that happens before and 
after dump start comparing versions
+         * with the version we read with {@link 
GridCacheVersionManager#last()}.
+         */
         @Nullable final GridCacheVersion startVer;
 
-        /** Last version on time of dump start. Can be used only for primary. 
*/
+        /**
+         * Unlike regular update, {@link IgniteDataStreamer} updates receive 
the same version for all entries.
+         * See {@code IsolatedUpdater.receive}.
+         * Note, using {@link IgniteDataStreamer} during cache dump creation 
can lead to dump inconsistency.
+         *
+         * @see GridCacheVersionManager#isolatedStreamerVersion()
+         */
         final GridCacheVersion isolatedStreamerVer;
 
         /** Topology Version. */
         private final AffinityTopologyVersion topVer;
 
         /** Partition serializer. */
-        private final DumpEntrySerializer serdes;
+        private final DumpEntrySerializer serializer;
 
         /** If {@code true} context is closed. */
         volatile boolean closed;
 
-        /** Count of writers. When count becomes {@code 0} context must be 
closed. */
-        private final AtomicInteger writers = new AtomicInteger(1); // 
Iterator writing entries to this context, by default.
+        /**
+         * Count of writers. When count becomes {@code 0} context must be 
closed.
+         * By deafult, one writer exists - partition iterator.
+         * Each call of {@link #writeChanged(int, long, KeyCacheObject, 
CacheObject, GridCacheVersion)} increment writers count.
+         * When count of writers becomes zero we good to relase all resources 
associated with partition dump.
+         */
+        private final AtomicInteger writers = new AtomicInteger(1);
 
         /**
          * @param gctx Group context.
@@ -415,7 +438,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
                 startVer = grpPrimaries.get(gctx.groupId()).contains(part) ? 
gctx.shared().versions().last() : null;
                 isolatedStreamerVer = 
cctx.versions().isolatedStreamerVersion();
 
-                serdes = new DumpEntrySerializer(thLocBufs);
+                serializer = new DumpEntrySerializer(thLocBufs);
                 changed = new HashMap<>();
 
                 for (int cache : gctx.cacheIds())
@@ -486,7 +509,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
          * @param key Key.
          * @param val Value.
          * @param ver Version.
-         * @return {@code True} if entry was written in dump,
+         * @return {@code True} if entry was written in dump.
          * {@code false} if it was already written by {@link 
#writeChanged(int, long, KeyCacheObject, CacheObject, GridCacheVersion)}.
          */
         public boolean writeForIterator(
@@ -519,9 +542,9 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
 
         /** */
         private void write(int cache, long expireTime, KeyCacheObject key, 
CacheObject val) {
-            synchronized (serdes) { // Prevent concurrent access to the dump 
file.
+            synchronized (serializer) { // Prevent concurrent access to the 
dump file.
                 try {
-                    ByteBuffer buf = serdes.writeToBuffer(cache, expireTime, 
key, val, cctx.cacheObjectContext(cache));
+                    ByteBuffer buf = serializer.writeToBuffer(cache, 
expireTime, key, val, cctx.cacheObjectContext(cache));
 
                     if (file.writeFully(buf) != buf.limit())
                         throw new IgniteException("Can't write row");
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
index 2496cd210c0..70cf5ec850e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
@@ -70,8 +70,8 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METAFILE_EXT;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents;
 
 /**
  * This class provides ability to work with saved cache dump.
@@ -86,9 +86,7 @@ public class Dump implements AutoCloseable {
     /** Specific consistent id. */
     private final @Nullable String consistentId;
 
-    /**
-     * Kernal context for each node in dump.
-     */
+    /** Kernal context for each node in dump. */
     private final GridKernalContext cctx;
 
     /** If {@code true} then return data in form of {@link BinaryObject}. */
@@ -149,7 +147,7 @@ public class Dump implements AutoCloseable {
         try {
             GridKernalContext kctx = new StandaloneGridKernalContext(log, 
binaryMeta, marshaller);
 
-            startAll(kctx);
+            startAllComponents(kctx);
 
             return kctx;
         }
@@ -351,7 +349,7 @@ public class Dump implements AutoCloseable {
 
     /** {@inheritDoc} */
     @Override public void close() throws Exception {
-        closeAll(cctx);
+        closeAllComponents(cctx);
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 2535af7fadf..d01a2f84139 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -56,8 +56,8 @@ import static java.lang.System.arraycopy;
 import static java.nio.file.Files.walkFileTree;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN;
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition;
 
@@ -177,7 +177,7 @@ public class IgniteWalIteratorFactory {
         if (iteratorParametersBuilder.sharedCtx == null) {
             GridCacheSharedContext<?, ?> sctx = 
prepareSharedCtx(iteratorParametersBuilder);
 
-            startAll(sctx.kernalContext());
+            startAllComponents(sctx.kernalContext());
 
             return new StandaloneWalRecordsIterator(
                 iteratorParametersBuilder.log == null ? log : 
iteratorParametersBuilder.log,
@@ -194,7 +194,7 @@ public class IgniteWalIteratorFactory {
                 @Override protected void onClose() throws 
IgniteCheckedException {
                     super.onClose();
 
-                    closeAll(sctx.kernalContext());
+                    closeAllComponents(sctx.kernalContext());
                 }
             };
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index a166840f2fb..1103f198dde 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -699,7 +699,7 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
      * @param kctx Kernal context.
      * @throws IgniteCheckedException In case of any error.
      */
-    public static void startAll(GridKernalContext kctx) throws 
IgniteCheckedException {
+    public static void startAllComponents(GridKernalContext kctx) throws 
IgniteCheckedException {
         for (GridComponent comp : kctx)
             comp.start();
     }
@@ -708,7 +708,7 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
      * @param kctx Kernal context.
      * @throws IgniteCheckedException In case of any error.
      */
-    public static void closeAll(GridKernalContext kctx) throws 
IgniteCheckedException {
+    public static void closeAllComponents(GridKernalContext kctx) throws 
IgniteCheckedException {
         for (GridComponent comp : kctx)
             comp.stop(true);
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index 20d6a4bfbaf..30eb29cde09 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -40,7 +40,7 @@ import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents;
 
 /**
  * Test context.
@@ -96,7 +96,7 @@ public class GridTestKernalContext extends 
GridKernalContextImpl {
      * @throws IgniteCheckedException If failed
      */
     public void start() throws IgniteCheckedException {
-        startAll(this);
+        startAllComponents(this);
     }
 
     /**

Reply via email to