This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch cache_dumps in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/cache_dumps by this push: new 94628d583ab Code review changes 94628d583ab is described below commit 94628d583abb28102133fd078f4163f31ba83172 Author: nizhikov <nizhi...@apache.org> AuthorDate: Mon Oct 9 08:12:58 2023 +0300 Code review changes --- .../org/apache/ignite/internal/cdc/CdcMain.java | 9 +++-- .../snapshot/AbstractCreateSnapshotFutureTask.java | 4 +- .../snapshot/SnapshotPartitionsVerifyHandler.java | 8 ++-- .../snapshot/dump/CreateDumpFutureTask.java | 43 +++++++++++++++++----- .../cache/persistence/snapshot/dump/Dump.java | 12 +++--- .../wal/reader/IgniteWalIteratorFactory.java | 8 ++-- .../wal/reader/StandaloneGridKernalContext.java | 4 +- .../junits/GridTestKernalContext.java | 4 +- 8 files changed, 56 insertions(+), 36 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index da0e791a905..2e5914d58d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -82,8 +82,8 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex; -import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll; -import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll; +import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents; +import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; /** @@ -330,7 +330,7 @@ public class CdcMain implements Runnable { } } finally { - closeAll(kctx); + closeAllComponents(kctx); if (log.isInfoEnabled()) log.info("Ignite Change Data Capture Application stopped."); @@ -371,7 +371,7 @@ public class CdcMain implements Runnable { kctx.resource().setSpringContext(ctx); - startAll(kctx); + startAllComponents(kctx); mreg = kctx.metric().registry("cdc"); @@ -853,6 +853,7 @@ public class CdcMain implements Runnable { /** * @param files Mapping files. + * @param filter Filter. * @return Type mapping iterator. */ public static Iterator<TypeMapping> typeMappingIterator(File[] files, Predicate<TypeMapping> filter) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java index 89ae911cf5d..a2de0d59e64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java @@ -44,9 +44,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; -/** - * - */ +/** */ public abstract class AbstractCreateSnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskResult> { /** * Cache group and corresponding partitions collected under the PME lock. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java index 3d14d937fc9..3ea6d4c1264 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java @@ -83,8 +83,8 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT; -import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll; -import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll; +import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents; +import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents; import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash; import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum; @@ -200,7 +200,7 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part EncryptionCacheKeyProvider snpEncrKeyProvider = new SnapshotEncryptionKeyProvider(cctx.kernalContext(), grpDirs); - startAll(snpCtx); + startAllComponents(snpCtx); try { U.doInParallel( @@ -313,7 +313,7 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part throw t; } finally { - closeAll(snpCtx); + closeAllComponents(snpCtx); } return res; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java index de1009059b0..237665c2ab0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; @@ -59,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSn import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFutureTaskResult; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.lang.GridCloseableIterator; @@ -372,7 +374,10 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple /** Partition id. */ final int part; - /** Hashes of cache keys of entries changed by the user during partition dump. */ + /** + * Key is cache id, values is set of keys dumped via + * {@link #writeChanged(int, long, KeyCacheObject, CacheObject, GridCacheVersion)}. + */ final Map<Integer, Set<KeyCacheObject>> changed; /** Count of entries changed during dump creation. */ @@ -381,23 +386,41 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple /** Partition dump file. Lazily initialized to prevent creation files for empty partitions. */ final FileIO file; - /** Last version on time of dump start. Can be used only for primary. */ + /** + * Regular updates with {@link IgniteCache#put(Object, Object)} and similar calls + * will use version generated with {@link GridCacheVersionManager#next(GridCacheVersion)}. + * Version is monotonically increase. + * Version generated on <b>primary</b> node and propagated to backups. + * So on primary we can distinguish updates that happens before and after dump start comparing versions + * with the version we read with {@link GridCacheVersionManager#last()}. + */ @Nullable final GridCacheVersion startVer; - /** Last version on time of dump start. Can be used only for primary. */ + /** + * Unlike regular update, {@link IgniteDataStreamer} updates receive the same version for all entries. + * See {@code IsolatedUpdater.receive}. + * Note, using {@link IgniteDataStreamer} during cache dump creation can lead to dump inconsistency. + * + * @see GridCacheVersionManager#isolatedStreamerVersion() + */ final GridCacheVersion isolatedStreamerVer; /** Topology Version. */ private final AffinityTopologyVersion topVer; /** Partition serializer. */ - private final DumpEntrySerializer serdes; + private final DumpEntrySerializer serializer; /** If {@code true} context is closed. */ volatile boolean closed; - /** Count of writers. When count becomes {@code 0} context must be closed. */ - private final AtomicInteger writers = new AtomicInteger(1); // Iterator writing entries to this context, by default. + /** + * Count of writers. When count becomes {@code 0} context must be closed. + * By deafult, one writer exists - partition iterator. + * Each call of {@link #writeChanged(int, long, KeyCacheObject, CacheObject, GridCacheVersion)} increment writers count. + * When count of writers becomes zero we good to relase all resources associated with partition dump. + */ + private final AtomicInteger writers = new AtomicInteger(1); /** * @param gctx Group context. @@ -415,7 +438,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple startVer = grpPrimaries.get(gctx.groupId()).contains(part) ? gctx.shared().versions().last() : null; isolatedStreamerVer = cctx.versions().isolatedStreamerVersion(); - serdes = new DumpEntrySerializer(thLocBufs); + serializer = new DumpEntrySerializer(thLocBufs); changed = new HashMap<>(); for (int cache : gctx.cacheIds()) @@ -486,7 +509,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple * @param key Key. * @param val Value. * @param ver Version. - * @return {@code True} if entry was written in dump, + * @return {@code True} if entry was written in dump. * {@code false} if it was already written by {@link #writeChanged(int, long, KeyCacheObject, CacheObject, GridCacheVersion)}. */ public boolean writeForIterator( @@ -519,9 +542,9 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple /** */ private void write(int cache, long expireTime, KeyCacheObject key, CacheObject val) { - synchronized (serdes) { // Prevent concurrent access to the dump file. + synchronized (serializer) { // Prevent concurrent access to the dump file. try { - ByteBuffer buf = serdes.writeToBuffer(cache, expireTime, key, val, cctx.cacheObjectContext(cache)); + ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, cctx.cacheObjectContext(cache)); if (file.writeFully(buf) != buf.limit()) throw new IgniteException("Can't write row"); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java index 2496cd210c0..70cf5ec850e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java @@ -70,8 +70,8 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METAFILE_EXT; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT; -import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll; -import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll; +import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents; +import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents; /** * This class provides ability to work with saved cache dump. @@ -86,9 +86,7 @@ public class Dump implements AutoCloseable { /** Specific consistent id. */ private final @Nullable String consistentId; - /** - * Kernal context for each node in dump. - */ + /** Kernal context for each node in dump. */ private final GridKernalContext cctx; /** If {@code true} then return data in form of {@link BinaryObject}. */ @@ -149,7 +147,7 @@ public class Dump implements AutoCloseable { try { GridKernalContext kctx = new StandaloneGridKernalContext(log, binaryMeta, marshaller); - startAll(kctx); + startAllComponents(kctx); return kctx; } @@ -351,7 +349,7 @@ public class Dump implements AutoCloseable { /** {@inheritDoc} */ @Override public void close() throws Exception { - closeAll(cctx); + closeAllComponents(cctx); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 2535af7fadf..d01a2f84139 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -56,8 +56,8 @@ import static java.lang.System.arraycopy; import static java.nio.file.Files.walkFileTree; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN; -import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll; -import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll; +import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents; +import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition; @@ -177,7 +177,7 @@ public class IgniteWalIteratorFactory { if (iteratorParametersBuilder.sharedCtx == null) { GridCacheSharedContext<?, ?> sctx = prepareSharedCtx(iteratorParametersBuilder); - startAll(sctx.kernalContext()); + startAllComponents(sctx.kernalContext()); return new StandaloneWalRecordsIterator( iteratorParametersBuilder.log == null ? log : iteratorParametersBuilder.log, @@ -194,7 +194,7 @@ public class IgniteWalIteratorFactory { @Override protected void onClose() throws IgniteCheckedException { super.onClose(); - closeAll(sctx.kernalContext()); + closeAllComponents(sctx.kernalContext()); } }; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index a166840f2fb..1103f198dde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -699,7 +699,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { * @param kctx Kernal context. * @throws IgniteCheckedException In case of any error. */ - public static void startAll(GridKernalContext kctx) throws IgniteCheckedException { + public static void startAllComponents(GridKernalContext kctx) throws IgniteCheckedException { for (GridComponent comp : kctx) comp.start(); } @@ -708,7 +708,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { * @param kctx Kernal context. * @throws IgniteCheckedException In case of any error. */ - public static void closeAll(GridKernalContext kctx) throws IgniteCheckedException { + public static void closeAllComponents(GridKernalContext kctx) throws IgniteCheckedException { for (GridComponent comp : kctx) comp.stop(true); } diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index 20d6a4bfbaf..30eb29cde09 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -40,7 +40,7 @@ import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi; import org.apache.ignite.testframework.GridTestUtils; -import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll; +import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents; /** * Test context. @@ -96,7 +96,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { * @throws IgniteCheckedException If failed */ public void start() throws IgniteCheckedException { - startAll(this); + startAllComponents(this); } /**