Repository: ignite Updated Branches: refs/heads/ignite-10044 e0657d718 -> 94ec2f892
IGNITE-10079 Fiexd WAL segments compression bug when FileWriteAheadLogManager return invalid lastCompactedSegment Signed-off-by: Andrey Gura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a09d546 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a09d546 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a09d546 Branch: refs/heads/ignite-10044 Commit: 2a09d54625c62acbb05ee71fb98c513b5e2c3183 Parents: 7920646 Author: Andrey Kuznetsov <[email protected]> Authored: Thu Nov 29 14:54:43 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Thu Nov 29 14:54:43 2018 +0300 ---------------------------------------------------------------------- .../wal/FileWriteAheadLogManager.java | 34 ++-- .../wal/aware/SegmentArchivedStorage.java | 16 ++ .../persistence/wal/aware/SegmentAware.java | 13 +- .../wal/aware/SegmentCompressStorage.java | 7 +- .../db/wal/WalCompactionAfterRestartTest.java | 161 +++++++++++++++++++ .../persistence/wal/aware/SegmentAwareTest.java | 10 +- 6 files changed, 211 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index b56b64f..fad1ec1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -1932,7 +1932,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0)); - return reserved ? segmentToCompress : -1; + if (reserved) + return segmentToCompress; + else { + segmentAware.onSegmentCompressed(segmentToCompress); + + return -1; + } } /** {@inheritDoc} */ @@ -1946,9 +1952,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl long segIdx = -1L; try { - segIdx = tryReserveNextSegmentOrWait(); - - if (segIdx <= segmentAware.lastCompressedIdx()) + if ((segIdx = tryReserveNextSegmentOrWait()) == -1) continue; deleteObsoleteRawSegments(); @@ -1967,21 +1971,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl Files.move(tmpZip.toPath(), zip.toPath()); - if (mode != WALMode.NONE) { - try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) { - f0.force(); - } - - if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) { - evt.record(new WalSegmentCompactedEvent( - cctx.localNode(), - segIdx, - zip.getAbsoluteFile()) - ); - } + try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) { + f0.force(); } segmentAware.onSegmentCompressed(segIdx); + + if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) { + evt.record(new WalSegmentCompactedEvent( + cctx.localNode(), + segIdx, + zip.getAbsoluteFile()) + ); + } } catch (IgniteInterruptedCheckedException ignore) { Thread.currentThread().interrupt(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java index c526ae1..e31628f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java @@ -34,6 +34,8 @@ class SegmentArchivedStorage extends SegmentObservable { * no segments archived. */ private volatile long lastAbsArchivedIdx = -1; + /** Latest truncated segment. */ + private volatile long lastTruncatedArchiveIdx = -1; /** * @param segmentLockStorage Protects WAL work segments from moving. @@ -136,4 +138,18 @@ class SegmentArchivedStorage extends SegmentObservable { private synchronized void onSegmentUnlocked(long segmentId) { notifyAll(); } + + /** + * @param lastTruncatedArchiveIdx Last truncated segment. + */ + void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) { + this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx; + } + + /** + * @return Last truncated segment. + */ + long lastTruncatedArchiveIdx() { + return lastTruncatedArchiveIdx; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java index e46d93f..a14f0ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java @@ -27,8 +27,6 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.aware. * Holder of actual information of latest manipulation on WAL segments. */ public class SegmentAware { - /** Latest truncated segment. */ - private volatile long lastTruncatedArchiveIdx = -1L; /** Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. */ private final SegmentReservationStorage reservationStorage = new SegmentReservationStorage(); /** Lock on segment protects from archiving segment. */ @@ -106,7 +104,12 @@ public class SegmentAware { * there's no segment to archive right now. */ public long waitNextSegmentToCompress() throws IgniteInterruptedCheckedException { - return Math.max(segmentCompressStorage.nextSegmentToCompressOrWait(), lastTruncatedArchiveIdx + 1); + long idx; + + while ((idx = segmentCompressStorage.nextSegmentToCompressOrWait()) <= lastTruncatedArchiveIdx()) + onSegmentCompressed(idx); + + return idx; } /** @@ -152,14 +155,14 @@ public class SegmentAware { * @param lastTruncatedArchiveIdx Last truncated segment; */ public void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) { - this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx; + segmentArchivedStorage.lastTruncatedArchiveIdx(lastTruncatedArchiveIdx); } /** * @return Last truncated segment. */ public long lastTruncatedArchiveIdx() { - return lastTruncatedArchiveIdx; + return segmentArchivedStorage.lastTruncatedArchiveIdx(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java index 174fb46..95d4f4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java @@ -94,6 +94,9 @@ public class SegmentCompressStorage { this.lastCompressedIdx = Math.min(lastMaxCompressedIdx, compressingSegments.get(0) - 1); else this.lastCompressedIdx = lastMaxCompressedIdx; + + if (compressedIdx > lastEnqueuedToCompressIdx) + lastEnqueuedToCompressIdx = compressedIdx; } /** @@ -120,9 +123,11 @@ public class SegmentCompressStorage { Long idx = segmentsToCompress.poll(); + assert idx != null; + compressingSegments.add(idx); - return idx == null ? -1L : idx; + return idx; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java new file mode 100644 index 0000000..3685fe7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java @@ -0,0 +1,161 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.events.WalSegmentCompactedEvent; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED; + +/** */ +public class WalCompactionAfterRestartTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(200L * 1024 * 1024)) + .setWalMode(WALMode.LOG_ONLY) + .setWalSegmentSize(512 * 1024) + .setWalCompactionEnabled(true) + .setMaxWalArchiveSize(2 * 512 * 1024) + ); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(DEFAULT_CACHE_NAME); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 16)); + ccfg.setBackups(0); + + cfg.setCacheConfiguration(ccfg); + cfg.setConsistentId(name); + + cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_COMPACTED); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ + public void test() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + doCachePuts(ig, 10_000); + + ig.cluster().active(false); + + stopGrid(0); + + IgniteEx ig0 = startGrid(0); + + ig0.cluster().active(true); + + List<IgniteBiTuple<Long, Long>> discrepancies = Collections.synchronizedList(new ArrayList<>()); + + ig0.events().localListen(e -> { + long evtSegIdx = ((WalSegmentCompactedEvent)e).getAbsWalSegmentIdx(); + long lastCompactedIdx = ig0.context().cache().context().wal().lastCompactedSegment(); + + if (lastCompactedIdx < 0 || lastCompactedIdx > evtSegIdx) + discrepancies.add(F.t(evtSegIdx, lastCompactedIdx)); + + return true; + }, EVT_WAL_SEGMENT_COMPACTED); + + doCachePuts(ig0, 5_000); + + stopGrid(0); + + if (!discrepancies.isEmpty()) { + fail("Discrepancies (EVT_WAL_SEGMENT_COMPACTED index vs. lastCompactedSegment):" + System.lineSeparator() + + discrepancies.stream() + .map(t -> String.format("%d <-> %d", t.get1(), t.get2())) + .collect(Collectors.joining(System.lineSeparator()))); + } + } + + /** */ + private void doCachePuts(IgniteEx ig, long millis) throws IgniteCheckedException { + IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME); + + AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<Long> putFut = GridTestUtils.runMultiThreadedAsync(() -> { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) + cache.put(rnd.nextInt(), "Ignite".getBytes()); + }, 4, "cache-filler"); + + U.sleep(millis); + + stop.set(true); + + putFut.get(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java index 0869356..d651e01 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java @@ -438,18 +438,12 @@ public class SegmentAwareTest extends TestCase { * Next segment for compress based on truncated archive idx. */ public void testCorrectCalculateNextCompressSegment() throws IgniteCheckedException, InterruptedException { - //given: thread which awaited segment. SegmentAware aware = new SegmentAware(10, true); - aware.onSegmentCompressed(5); aware.setLastArchivedAbsoluteIndex(6); - aware.lastTruncatedArchiveIdx(7); - - //when: - long segmentToCompress = aware.waitNextSegmentToCompress(); - //then: segment to compress greater than truncated archive idx - assertEquals(8, segmentToCompress); + for (int exp = 0; exp <= 6; exp++) + assertEquals(exp, aware.waitNextSegmentToCompress()); } /**
