Repository: ignite Updated Branches: refs/heads/master 28f32b885 -> 932692ecc
IGNITE-7865 Supported serializerVersion method for WAL manager - Fixes #3594. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/932692ec Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/932692ec Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/932692ec Branch: refs/heads/master Commit: 932692ecc7a3e0feda856ae37bd2b2c182274b35 Parents: 28f32b8 Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Fri Mar 2 15:11:50 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Mar 2 15:11:50 2018 +0300 ---------------------------------------------------------------------- .../pagemem/wal/IgniteWriteAheadLogManager.java | 5 + .../GridDhtPartitionsExchangeFuture.java | 9 +- .../wal/FileWriteAheadLogManager.java | 10 +- .../wal/FsyncModeFileWriteAheadLogManager.java | 112 ++++++++++++++++--- .../persistence/pagemem/NoOpWALManager.java | 5 + 5 files changed, 116 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/932692ec/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index 6c3c36e..55806d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -38,6 +38,11 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni public boolean isFullSync(); /** + * @return Current serializer version. + */ + public int serializerVersion(); + + /** * Resumes logging after start. When WAL manager is started, it will skip logging any updates until this * method is called to avoid logging changes induced by the state restore procedure. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/932692ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a56c4e2..b4febf7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -81,7 +81,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.BaselineTopology; @@ -830,7 +829,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte top.update(null, clientTop.partitionMap(true), clientTop.fullUpdateCounters(), - Collections.<Integer>emptySet(), + Collections.emptySet(), null); } } @@ -1694,7 +1693,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private void logExchange(DiscoveryEvent evt) { if (cctx.kernalContext().state().publicApiActiveState(false) && cctx.wal() != null) { - if (((FileWriteAheadLogManager)cctx.wal()).serializerVersion() > 1) + if (cctx.wal().serializerVersion() > 1) try { ExchangeRecord.Type type = null; @@ -2248,7 +2247,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte CounterWithNodes maxCntr = maxCntrs.get(part.id()); if (maxCntr == null && cntr == 0) { - CounterWithNodes cntrObj = new CounterWithNodes(cntr, cctx.localNodeId()); + CounterWithNodes cntrObj = new CounterWithNodes(0, cctx.localNodeId()); for (UUID nodeId : msgs.keySet()) { if (top.partitionState(nodeId, part.id()) == GridDhtPartitionState.OWNING) @@ -3087,7 +3086,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte top.update(resTopVer, entry.getValue(), cntrMap, - Collections.<Integer>emptySet(), + Collections.emptySet(), null); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/932692ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 9bd3bfd..69ee96a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -645,10 +645,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj); } - /** - * @return Latest serializer version. - */ - public int serializerVersion() { + /** {@inheritDoc} */ + @Override public int serializerVersion() { return serializerVer; } @@ -2304,7 +2302,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @return True if segment is ZIP compressed. */ - public boolean isCompressed() { + @Override public boolean isCompressed() { return file.getName().endsWith(".zip"); } @@ -2376,7 +2374,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @throws IgniteCheckedException If failed to close the WAL segment file. */ - public void close() throws IgniteCheckedException { + @Override public void close() throws IgniteCheckedException { try { fileIO.close(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/932692ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index de45e3d..4904102 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -30,8 +30,11 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.file.Files; import java.sql.Time; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -44,6 +47,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import java.util.zip.ZipEntry; @@ -88,12 +92,14 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; @@ -416,6 +422,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda if (!cctx.kernalContext().clientNode()) { assert archiver != null; + archiver.start(); if (compressor != null) @@ -517,9 +524,48 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } /** - * @return Latest serializer version. + * Collect wal segment files from low pointer (include) to high pointer (not include) and reserve low pointer. + * + * @param low Low bound. + * @param high High bound. */ - public int serializerVersion() { + public Collection<File> getAndReserveWalFiles(FileWALPointer low, FileWALPointer high) throws IgniteCheckedException { + final long awaitIdx = high.index() - 1; + + while (archiver.lastArchivedAbsoluteIndex() < awaitIdx) + LockSupport.parkNanos(Thread.currentThread(), 1_000_000); + + if (!reserve(low)) + throw new IgniteCheckedException("WAL archive segment has been deleted [idx=" + low.index() + "]"); + + List<File> res = new ArrayList<>(); + + for (long i = low.index(); i < high.index(); i++) { + String segmentName = FileWriteAheadLogManager.FileDescriptor.fileName(i); + + File file = new File(walArchiveDir, segmentName); + File fileZip = new File(walArchiveDir, segmentName + ".zip"); + + if (file.exists()) + res.add(file); + else if (fileZip.exists()) + res.add(fileZip); + else { + if (log.isInfoEnabled()) { + log.info("Segment not found: " + file.getName() + "/" + fileZip.getName()); + + log.info("Stopped iteration on idx: " + i); + } + + break; + } + } + + return res; + } + + /** {@inheritDoc}*/ + @Override public int serializerVersion() { return serializerVersion; } @@ -572,7 +618,13 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda // Need to calculate record size first. record.size(serializer.size(record)); - for (; ; currWrHandle = rollOver(currWrHandle)) { + while (true) { + if (record.rollOver()){ + assert cctx.database().checkpointLockIsHeldByThread(); + + currWrHandle = rollOver(currWrHandle); + } + WALPointer ptr = currWrHandle.addRecord(record); if (ptr != null) { @@ -585,6 +637,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda return ptr; } + else + currWrHandle = rollOver(currWrHandle); checkNode(); @@ -1040,7 +1094,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda createFile(first); } else - checkFiles(0, false, null); + checkFiles(0, false, null, null); } /** @@ -1188,6 +1242,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda */ private Map<Long, Integer> locked = new HashMap<>(); + /** Formatted index. */ + private int formatted; + /** * */ @@ -1285,8 +1342,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda while (curAbsWalIdx == -1 && !stopped) wait(); - if (curAbsWalIdx != 0 && lastAbsArchivedIdx == -1) - changeLastArchivedIndexAndWakeupCompressor(curAbsWalIdx - 1); + // If the archive directory is empty, we can be sure that there were no WAL segments archived. + // This is ensured by the check in truncate() which will leave at least one file there + // once it was archived. } while (!Thread.currentThread().isInterrupted() && !stopped) { @@ -1373,7 +1431,13 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda // Notify archiver thread. notifyAll(); - while (curAbsWalIdx - lastAbsArchivedIdx > dsCfg.getWalSegments() && cleanException == null) + int segments = dsCfg.getWalSegments(); + + while ((curAbsWalIdx - lastAbsArchivedIdx > segments && cleanException == null)) + wait(); + + // Wait for formatter so that we do not open an empty file in DEFAULT mode. + while (curAbsWalIdx % dsCfg.getWalSegments() > formatted) wait(); return curAbsWalIdx; @@ -1502,11 +1566,23 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * {@link FsyncModeFileWriteAheadLogManager#checkOrPrepareFiles()} */ private void allocateRemainingFiles() throws IgniteCheckedException { - checkFiles(1, true, new IgnitePredicate<Integer>() { - @Override public boolean apply(Integer integer) { - return !checkStop(); - } - }); + final FileArchiver archiver = this; + + checkFiles(1, + true, + new IgnitePredicate<Integer>() { + @Override public boolean apply(Integer integer) { + return !checkStop(); + } + }, new CI1<Integer>() { + @Override public void apply(Integer idx) { + synchronized (archiver) { + formatted = idx; + + archiver.notifyAll(); + } + } + }); } } @@ -1821,7 +1897,12 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * @param p Predicate Exit condition. * @throws IgniteCheckedException if validation or create file fail. */ - private void checkFiles(int startWith, boolean create, IgnitePredicate<Integer> p) throws IgniteCheckedException { + private void checkFiles( + int startWith, + boolean create, + @Nullable IgnitePredicate<Integer> p, + @Nullable IgniteInClosure<Integer> completionCallback + ) throws IgniteCheckedException { for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || (p != null && p.apply(i))); i++) { File checkFile = new File(walWorkDir, FileDescriptor.fileName(i)); @@ -1835,6 +1916,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } else if (create) createFile(checkFile); + + if (completionCallback != null) + completionCallback.apply(i); } } @@ -2133,7 +2217,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** * @throws IgniteCheckedException If failed to close the WAL segment file. */ - public void close() throws IgniteCheckedException { + @Override public void close() throws IgniteCheckedException { try { fileIO.close(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/932692ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index 6a35c8a..9b2a206 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -42,6 +42,11 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { } /** {@inheritDoc} */ + @Override public int serializerVersion() { + return 0; + } + + /** {@inheritDoc} */ @Override public void resumeLogging(WALPointer ptr) throws IgniteCheckedException { // No-op. }