Repository: ignite Updated Branches: refs/heads/master 02739a57c -> c2f09c1c8
IGNITE-7017 Added an option to write WAL directly to the archive directory - Fixes #3140. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2f09c1c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2f09c1c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2f09c1c Branch: refs/heads/master Commit: c2f09c1c8430109f0994095d480f556fc685ee20 Parents: 02739a5 Author: dpavlov <[email protected]> Authored: Mon Jan 29 17:44:11 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Jan 29 17:44:11 2018 +0300 ---------------------------------------------------------------------- .../configuration/DataRegionConfiguration.java | 2 +- .../wal/FileWriteAheadLogManager.java | 225 +++++++++---------- .../persistence/wal/SegmentArchivedMonitor.java | 64 ++++++ .../wal/SegmentReservationStorage.java | 61 +++++ .../wal/IgniteWalHistoryReservationsTest.java | 24 +- .../db/wal/reader/IgniteWalReaderTest.java | 27 ++- 6 files changed, 266 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f09c1c/modules/core/src/main/java/org/apache/ignite/configuration/DataRegionConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataRegionConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataRegionConfiguration.java index 2950e9c..52cb764 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataRegionConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataRegionConfiguration.java @@ -57,7 +57,7 @@ import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA * <bean class="org.apache.ignite.configuration.DataRegionConfiguration"> * <property name="name" value="25MB_Region_Swapping"/> * <property name="initialSize" value="#{25 * 1024 * 1024}"/> - * <property name="initialSize" value="#{100 * 1024 * 1024}"/> + * <property name="maxSize" value="#{100 * 1024 * 1024}"/> * <property name="swapPath" value="db/swap"/> * </bean> * </list> http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f09c1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 73751c6..6b8be7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -41,8 +41,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -271,15 +269,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Factory to provide I/O interfaces for read/write operations with files */ private FileIOFactory ioFactory; - /** Next segment archived monitor. */ - private final Object nextSegmentArchivedMonitor = new Object(); + /** Next WAL segment archived monitor. Manages last archived index, emulates archivation in no-archiver mode. */ + private final SegmentArchivedMonitor archivedMonitor = new SegmentArchivedMonitor(); + + /** Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. */ + private final SegmentReservationStorage reservationStorage = new SegmentReservationStorage(); /** Updater for {@link #currHnd}, used for verify there are no concurrent update for current log segment handle */ private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> CURR_HND_UPD = AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currHnd"); - /** */ - private volatile FileArchiver archiver; + /** + * File archiver moves segments from work directory to archive. Locked segments may be kept not moved until + * release. For mode archive and work folders set to equal value, archiver is not created. + */ + @Nullable private volatile FileArchiver archiver; /** Compressor. */ private volatile FileCompressor compressor; @@ -386,7 +390,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1; - archiver = new FileArchiver(tup == null ? -1 : tup.get2()); + long lastAbsArchivedIdx = tup == null ? -1 : tup.get2(); + + if (isArchiverEnabled()) + archiver = new FileArchiver(lastAbsArchivedIdx); + else + archiver = null; + + if (lastAbsArchivedIdx > 0) + archivedMonitor.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx); if (dsCfg.isWalCompactionEnabled()) { compressor = new FileCompressor(); @@ -408,14 +420,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * Archiver can be not created, all files will be written to WAL folder, using absolute segment index. * + * @return flag indicating if archiver is disabled. */ - public Collection<File> getAndReserveWalFiles(FileWALPointer low, FileWALPointer high) throws IgniteCheckedException { - FileArchiver archiver0 = archiver; + private boolean isArchiverEnabled() { + if (walArchiveDir != null && walWorkDir != null) + return !walArchiveDir.equals(walWorkDir); + + return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath())); + } + /** + * + */ + public Collection<File> getAndReserveWalFiles(FileWALPointer low, FileWALPointer high) throws IgniteCheckedException { final long awaitIdx = high.index() - 1; - awaitSegmentArchived(archiver0, awaitIdx); + archivedMonitor.awaitSegmentArchived(awaitIdx); if (!reserve(low)) throw new IgniteCheckedException("WAL archive segment has been deleted [idx=" + low.index() + "]"); @@ -439,23 +461,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @param archiver0 Archiver. - * @param awaitIdx Method will wait archivation of that index. - */ - private void awaitSegmentArchived(FileArchiver archiver0, long awaitIdx) throws IgniteInterruptedCheckedException { - synchronized (nextSegmentArchivedMonitor) { - while (archiver0.lastArchivedAbsoluteIndex() < awaitIdx) { - try { - nextSegmentArchivedMonitor.wait(2000); - } - catch (InterruptedException e) { - throw new IgniteInterruptedCheckedException(e); - } - } - } - } - - /** * @throws IgniteCheckedException if WAL store path is configured and archive path isn't (or vice versa) */ private void checkWalConfiguration() throws IgniteCheckedException { @@ -517,8 +522,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl start0(); if (!cctx.kernalContext().clientNode()) { - assert archiver != null; - archiver.start(); + if (isArchiverEnabled()) { + assert archiver != null; + + archiver.start(); + } if (compressor != null) compressor.start(); @@ -781,15 +789,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (mode == WALMode.NONE) return false; - FileArchiver archiver0 = archiver; - - if (archiver0 == null) - throw new IgniteCheckedException("Could not reserve WAL segment: archiver == null"); - - archiver0.reserve(((FileWALPointer)start).index()); + reservationStorage.reserve(((FileWALPointer)start).index()); if (!hasIndex(((FileWALPointer)start).index())) { - archiver0.release(((FileWALPointer)start).index()); + reservationStorage.release(((FileWALPointer)start).index()); return false; } @@ -804,12 +807,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (mode == WALMode.NONE) return; - FileArchiver archiver0 = archiver; - - if (archiver0 == null) - throw new IgniteCheckedException("Could not release WAL segment: archiver == null"); - - archiver0.release(((FileWALPointer)start).index()); + reservationStorage.release(((FileWALPointer)start).index()); } /** @@ -850,17 +848,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int deleted = 0; - FileArchiver archiver0 = archiver; - for (FileDescriptor desc : descs) { if (lowPtr != null && desc.idx < lowPtr.index()) continue; // Do not delete reserved or locked segment and any segment after it. - if (archiver0 != null && archiver0.reserved(desc.idx)) + if (segmentReservedOrLocked(desc.idx)) return deleted; - long lastArchived = archiver0 != null ? archiver0.lastArchivedAbsoluteIndex() : lastArchivedIndex(); + long archivedAbsIdx = archivedMonitor.lastArchivedAbsoluteIndex(); + + long lastArchived = archivedAbsIdx >= 0 ? archivedAbsIdx : lastArchivedIndex(); // We need to leave at least one archived segment to correctly determine the archive index. if (desc.idx < highPtr.index() && desc.idx < lastArchived) { @@ -879,6 +877,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return deleted; } + /** + * Check if WAL segment locked (protected from move to archive) or reserved (protected from deletion from WAL + * cleanup). + * + * @param absIdx Absolute WAL segment index for check reservation. + * @return {@code True} if index is locked. + */ + private boolean segmentReservedOrLocked(long absIdx) { + FileArchiver archiver0 = archiver; + + return ((archiver0 != null) && archiver0.locked(absIdx)) + || (reservationStorage.reserved(absIdx)); + + } + /** {@inheritDoc} */ @Override public void allowCompressionUntil(WALPointer ptr) { if (compressor != null) @@ -889,7 +902,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl @Override public int walArchiveSegments() { long lastTruncated = lastTruncatedArchiveIdx; - long lastArchived = archiver.lastArchivedAbsoluteIndex(); + long lastArchived = archivedMonitor.lastArchivedAbsoluteIndex(); if (lastArchived == -1) return 0; @@ -903,9 +916,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl @Override public boolean reserved(WALPointer ptr) { FileWALPointer fPtr = (FileWALPointer)ptr; - FileArchiver archiver0 = archiver; - - return archiver0 != null && archiver0.reserved(fPtr.index()); + return segmentReservedOrLocked(fPtr.index()); } /** {@inheritDoc} */ @@ -1050,7 +1061,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException { long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); - long segNo = absIdx % dsCfg.getWalSegments(); + @Nullable FileArchiver archiver0 = archiver; + + long segNo = archiver0 == null ? absIdx : absIdx % dsCfg.getWalSegments(); File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo)); @@ -1110,7 +1123,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl ser, rbuf); - archiver.currentWalIndex(absIdx); + if (archiver0 != null) + archiver0.currentWalIndex(absIdx); + else + archivedMonitor.setLastArchivedAbsoluteIndex(absIdx - 1); return hnd; } @@ -1238,9 +1254,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER); - if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments()) - throw new IgniteCheckedException("Failed to initialize wal (work directory contains " + - "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']'); + if(isArchiverEnabled()) + if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments()) + throw new IgniteCheckedException("Failed to initialize wal (work directory contains " + + "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']'); // Allocate the first segment synchronously. All other segments will be allocated by archiver in background. if (allFiles.length == 0) { @@ -1253,9 +1270,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * Clears the file with zeros. + * Clears the file, fills with zeros for Default mode. * * @param file File to format. + * @throws IgniteCheckedException if formatting failed */ private void formatFile(File file) throws IgniteCheckedException { if (log.isDebugEnabled()) @@ -1317,8 +1335,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @throws IgniteCheckedException If failed. */ private File pollNextFile(long curIdx) throws IgniteCheckedException { + FileArchiver archiver0 = archiver; + + if (archiver0 == null) { + archivedMonitor.setLastArchivedAbsoluteIndex(curIdx); + + return new File(walWorkDir, FileDescriptor.fileName(curIdx + 1)); + } + // Signal to archiver that we are done with the segment and it can be archived. - long absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx); + long absNextIdx = archiver0.nextAbsoluteSegmentIndex(curIdx); long segmentIdx = absNextIdx % dsCfg.getWalSegments(); @@ -1398,12 +1424,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** current thread stopping advice */ private volatile boolean stopped; - /** - * Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments - * which >= reserved segment index. - */ - private NavigableMap<Long, Integer> reserved = new TreeMap<>(); - /** Formatted index. */ private int formatted; @@ -1423,13 +1443,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @return Last archived segment absolute index. - */ - private long lastArchivedAbsoluteIndex() { - return lastAbsArchivedIdx; - } - - /** * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. */ private void shutdown() throws IgniteInterruptedCheckedException { @@ -1454,39 +1467,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @param absIdx Index for reservation. - */ - private synchronized void reserve(long absIdx) { - Integer cur = reserved.get(absIdx); - - if (cur == null) - reserved.put(absIdx, 1); - else - reserved.put(absIdx, cur + 1); - } - - /** - * Check if WAL segment locked or reserved + * Check if WAL segment locked (protected from move to archive) * * @param absIdx Index for check reservation. - * @return {@code True} if index is reserved. + * @return {@code True} if index is locked. */ - private synchronized boolean reserved(long absIdx) { - return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != null; - } - - /** - * @param absIdx Reserved index. - */ - private synchronized void release(long absIdx) { - Integer cur = reserved.get(absIdx); - - assert cur != null && cur >= 1 : cur; - - if (cur == 1) - reserved.remove(absIdx); - else - reserved.put(absIdx, cur - 1); + private synchronized boolean locked(long absIdx) { + return locked.containsKey(absIdx); } /** {@inheritDoc} */ @@ -1577,9 +1564,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (compressor != null) compressor.onNextSegmentArchived(); - synchronized (nextSegmentArchivedMonitor) { - nextSegmentArchivedMonitor.notifyAll(); - } + archivedMonitor.setLastArchivedAbsoluteIndex(idx); } /** @@ -1638,8 +1623,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl " lastAbsArchivedIdx=" + lastAbsArchivedIdx); return true; - } + } Integer cur = locked.get(absIdx); cur = cur == null ? 1 : cur + 1; @@ -1824,7 +1809,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl long segmentToCompress = lastCompressedIdx + 1; synchronized (this) { - while (segmentToCompress > Math.min(lastAllowedToCompressIdx, archiver.lastArchivedAbsoluteIndex())) { + while (segmentToCompress > Math.min(lastAllowedToCompressIdx, archivedMonitor.lastArchivedAbsoluteIndex())) { wait(); if (stopped) @@ -1845,11 +1830,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private void deleteObsoleteRawSegments() { FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)); - FileArchiver archiver0 = archiver; - for (FileDescriptor desc : descs) { // Do not delete reserved or locked segment and any segment after it. - if (archiver0 != null && archiver0.reserved(desc.idx)) + if (segmentReservedOrLocked(desc.idx)) return; if (desc.idx < lastCompressedIdx) { @@ -2852,14 +2835,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private final File walArchiveDir; - /** */ - private final FileArchiver archiver; + /** See {@link FileWriteAheadLogManager#archiver}. */ + @Nullable private final FileArchiver archiver; /** */ private final FileDecompressor decompressor; /** */ - private final DataStorageConfiguration psCfg; + private final DataStorageConfiguration dsCfg; /** Optional start pointer. */ @Nullable @@ -2875,9 +2858,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param walArchiveDir WAL archive dir. * @param start Optional start pointer. * @param end Optional end pointer. - * @param psCfg Database configuration. + * @param dsCfg Database configuration. * @param serializerFactory Serializer factory. - * @param archiver Archiver. + * @param archiver File Archiver. * @param decompressor Decompressor. *@param log Logger @throws IgniteCheckedException If failed to initialize WAL segment. */ @@ -2887,10 +2870,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl File walArchiveDir, @Nullable FileWALPointer start, @Nullable FileWALPointer end, - DataStorageConfiguration psCfg, + DataStorageConfiguration dsCfg, @NotNull RecordSerializerFactory serializerFactory, FileIOFactory ioFactory, - FileArchiver archiver, + @Nullable FileArchiver archiver, FileDecompressor decompressor, IgniteLogger log ) throws IgniteCheckedException { @@ -2898,10 +2881,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl cctx, serializerFactory, ioFactory, - psCfg.getWalRecordIteratorBufferSize()); + dsCfg.getWalRecordIteratorBufferSize()); this.walWorkDir = walWorkDir; this.walArchiveDir = walArchiveDir; - this.psCfg = psCfg; + this.dsCfg = dsCfg; this.archiver = archiver; this.start = start; this.end = end; @@ -3014,10 +2997,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx); - if (readArchive) - fd = new FileDescriptor(new File(walArchiveDir, FileDescriptor.fileName(curWalSegmIdx))); + if (archiver == null || readArchive) { + fd = new FileDescriptor(new File(walArchiveDir, + FileDescriptor.fileName(curWalSegmIdx))); + } else { - long workIdx = curWalSegmIdx % psCfg.getWalSegments(); + long workIdx = curWalSegmIdx % dsCfg.getWalSegments(); fd = new FileDescriptor( new File(walWorkDir, FileDescriptor.fileName(workIdx)), http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f09c1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java new file mode 100644 index 0000000..81ecd41 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import org.apache.ignite.internal.IgniteInterruptedCheckedException; + +/** + * Next WAL segment archived monitor. Manages last archived index, allows to emulate archivation in no-archiver mode. + * Monitor which is notified each time WAL segment is archived. + */ +class SegmentArchivedMonitor { + /** + * Last archived file absolute index, 0-based. Write is quarded by {@code this}. Negative value indicates there are + * no segments archived. + */ + private volatile long lastAbsArchivedIdx = -1; + + /** + * @return Last archived segment absolute index. + */ + long lastArchivedAbsoluteIndex() { + return lastAbsArchivedIdx; + } + + /** + * @param lastAbsArchivedIdx new value of last archived segment index + */ + synchronized void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) { + this.lastAbsArchivedIdx = lastAbsArchivedIdx; + + notifyAll(); + } + + /** + * Method will wait activation of particular WAL segment index. + * + * @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true. + * @throws IgniteInterruptedCheckedException if interrupted. + */ + synchronized void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException { + while (lastArchivedAbsoluteIndex() < awaitIdx) { + try { + wait(2000); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f09c1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java new file mode 100644 index 0000000..17da96d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. + */ +class SegmentReservationStorage { + /** + * Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments + * which has index >= reserved segment index. Guarded by {@code this}. + */ + private NavigableMap<Long, Integer> reserved = new TreeMap<>(); + + /** + * @param absIdx Index for reservation. + */ + synchronized void reserve(long absIdx) { + reserved.merge(absIdx, 1, (a, b) -> a + b); + } + + /** + * Checks if segment is currently reserved (protected from deletion during WAL cleanup). + * @param absIdx Index for check reservation. + * @return {@code True} if index is reserved. + */ + synchronized boolean reserved(long absIdx) { + return reserved.floorKey(absIdx) != null; + } + + /** + * @param absIdx Reserved index. + */ + synchronized void release(long absIdx) { + Integer cur = reserved.get(absIdx); + + assert cur != null && cur >= 1 : cur; + + if (cur == 1) + reserved.remove(absIdx); + else + reserved.put(absIdx, cur - 1); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f09c1c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java index 66a8aa9..cbdcc95 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java @@ -192,10 +192,10 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal(); - Object archiver = GridTestUtils.getFieldValue(wal, "archiver"); + Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage"); - synchronized (archiver) { - Map reserved = GridTestUtils.getFieldValue(archiver, "reserved"); + synchronized (reservationStorage) { + Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved"); if (reserved.isEmpty()) return false; @@ -219,10 +219,10 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal(); - Object archiver = GridTestUtils.getFieldValue(wal, "archiver"); + Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage"); - synchronized (archiver) { - Map reserved = GridTestUtils.getFieldValue(archiver, "reserved"); + synchronized (reservationStorage) { + Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved"); if (!reserved.isEmpty()) return false; @@ -402,10 +402,10 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal(); - Object archiver = GridTestUtils.getFieldValue(wal, "archiver"); + Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage"); - synchronized (archiver) { - Map reserved = GridTestUtils.getFieldValue(archiver, "reserved"); + synchronized (reservationStorage) { + Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved"); if (reserved.isEmpty()) return false; @@ -431,10 +431,10 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal(); - Object archiver = GridTestUtils.getFieldValue(wal, "archiver"); + Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage"); - synchronized (archiver) { - Map reserved = GridTestUtils.getFieldValue(archiver, "reserved"); + synchronized (reservationStorage) { + Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved"); if (!reserved.isEmpty()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f09c1c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java index 63c219b..2f4c8d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java @@ -119,6 +119,9 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { /** Clear properties in afterTest() method. */ private boolean clearProperties; + /** Set WAL and Archive path to same value. */ + private boolean setWalAndArchiveToSameValue; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { final IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -134,7 +137,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { cfg.setIncludeEventTypes(EventType.EVT_WAL_SEGMENT_ARCHIVED); - DataStorageConfiguration memCfg = new DataStorageConfiguration() + DataStorageConfiguration dsCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration( new DataRegionConfiguration().setMaxSize(1024 * 1024 * 1024).setPersistenceEnabled(true)) .setPageSize(PAGE_SIZE) @@ -144,9 +147,24 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { .setWalMode(customWalMode != null ? customWalMode : WALMode.BACKGROUND); if (archiveIncompleteSegmentAfterInactivityMs > 0) - memCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs); + dsCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs); + + final String workDir = U.defaultWorkDirectory(); + final File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false); + final File wal = new File(db, "wal"); + + if(setWalAndArchiveToSameValue) { + final String walAbsPath = wal.getAbsolutePath(); + + dsCfg.setWalPath(walAbsPath); + dsCfg.setWalArchivePath(walAbsPath); + } else { + dsCfg.setWalPath(wal.getAbsolutePath()); + dsCfg.setWalArchivePath(new File(wal, "archive").getAbsolutePath()); + } + + cfg.setDataStorageConfiguration(dsCfg); - cfg.setDataStorageConfiguration(memCfg); return cfg; } @@ -176,6 +194,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * @throws Exception if failed. */ public void testFillWalAndReadRecords() throws Exception { + setWalAndArchiveToSameValue = false; final int cacheObjectsToWrite = 10000; final Ignite ignite0 = startGrid("node0"); @@ -192,7 +211,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { final String workDir = U.defaultWorkDirectory(); final File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false); final File wal = new File(db, "wal"); - final File walArchive = new File(wal, "archive"); + final File walArchive = setWalAndArchiveToSameValue ? wal : new File(wal, "archive"); final MockWalIteratorFactory mockItFactory = new MockWalIteratorFactory(log, PAGE_SIZE, consistentId, subfolderName, WAL_SEGMENTS); final WALIterator it = mockItFactory.iterator(wal, walArchive);
