Repository: ignite
Updated Branches:
  refs/heads/ignite-10044 e0657d718 -> 94ec2f892


IGNITE-10079 Fiexd WAL segments compression bug when FileWriteAheadLogManager 
return invalid lastCompactedSegment

Signed-off-by: Andrey Gura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a09d546
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a09d546
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a09d546

Branch: refs/heads/ignite-10044
Commit: 2a09d54625c62acbb05ee71fb98c513b5e2c3183
Parents: 7920646
Author: Andrey Kuznetsov <[email protected]>
Authored: Thu Nov 29 14:54:43 2018 +0300
Committer: Andrey Gura <[email protected]>
Committed: Thu Nov 29 14:54:43 2018 +0300

----------------------------------------------------------------------
 .../wal/FileWriteAheadLogManager.java           |  34 ++--
 .../wal/aware/SegmentArchivedStorage.java       |  16 ++
 .../persistence/wal/aware/SegmentAware.java     |  13 +-
 .../wal/aware/SegmentCompressStorage.java       |   7 +-
 .../db/wal/WalCompactionAfterRestartTest.java   | 161 +++++++++++++++++++
 .../persistence/wal/aware/SegmentAwareTest.java |  10 +-
 6 files changed, 211 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index b56b64f..fad1ec1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1932,7 +1932,13 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             boolean reserved = reserve(new FileWALPointer(segmentToCompress, 
0, 0));
 
-            return reserved ? segmentToCompress : -1;
+            if (reserved)
+                return segmentToCompress;
+            else {
+                segmentAware.onSegmentCompressed(segmentToCompress);
+
+                return -1;
+            }
         }
 
         /** {@inheritDoc} */
@@ -1946,9 +1952,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 long segIdx = -1L;
 
                 try {
-                    segIdx = tryReserveNextSegmentOrWait();
-
-                    if (segIdx <= segmentAware.lastCompressedIdx())
+                    if ((segIdx = tryReserveNextSegmentOrWait()) == -1)
                         continue;
 
                     deleteObsoleteRawSegments();
@@ -1967,21 +1971,19 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                     Files.move(tmpZip.toPath(), zip.toPath());
 
-                    if (mode != WALMode.NONE) {
-                        try (FileIO f0 = ioFactory.create(zip, CREATE, READ, 
WRITE)) {
-                            f0.force();
-                        }
-
-                        if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && 
!cctx.kernalContext().recoveryMode()) {
-                            evt.record(new WalSegmentCompactedEvent(
-                                    cctx.localNode(),
-                                    segIdx,
-                                    zip.getAbsoluteFile())
-                            );
-                        }
+                    try (FileIO f0 = ioFactory.create(zip, CREATE, READ, 
WRITE)) {
+                        f0.force();
                     }
 
                     segmentAware.onSegmentCompressed(segIdx);
+
+                    if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && 
!cctx.kernalContext().recoveryMode()) {
+                        evt.record(new WalSegmentCompactedEvent(
+                                cctx.localNode(),
+                                segIdx,
+                                zip.getAbsoluteFile())
+                        );
+                    }
                 }
                 catch (IgniteInterruptedCheckedException ignore) {
                     Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
index c526ae1..e31628f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
@@ -34,6 +34,8 @@ class SegmentArchivedStorage extends SegmentObservable {
      * no segments archived.
      */
     private volatile long lastAbsArchivedIdx = -1;
+    /** Latest truncated segment. */
+    private volatile long lastTruncatedArchiveIdx = -1;
 
     /**
      * @param segmentLockStorage Protects WAL work segments from moving.
@@ -136,4 +138,18 @@ class SegmentArchivedStorage extends SegmentObservable {
     private synchronized void onSegmentUnlocked(long segmentId) {
         notifyAll();
     }
+
+    /**
+     * @param lastTruncatedArchiveIdx Last truncated segment.
+     */
+    void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) {
+        this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx;
+    }
+
+    /**
+     * @return Last truncated segment.
+     */
+    long lastTruncatedArchiveIdx() {
+        return lastTruncatedArchiveIdx;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index e46d93f..a14f0ca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -27,8 +27,6 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.
  * Holder of actual information of latest manipulation on WAL segments.
  */
 public class SegmentAware {
-    /** Latest truncated segment. */
-    private volatile long lastTruncatedArchiveIdx = -1L;
     /** Segment reservations storage: Protects WAL segments from deletion 
during WAL log cleanup. */
     private final SegmentReservationStorage reservationStorage = new 
SegmentReservationStorage();
     /** Lock on segment protects from archiving segment. */
@@ -106,7 +104,12 @@ public class SegmentAware {
      * there's no segment to archive right now.
      */
     public long waitNextSegmentToCompress() throws 
IgniteInterruptedCheckedException {
-        return Math.max(segmentCompressStorage.nextSegmentToCompressOrWait(), 
lastTruncatedArchiveIdx + 1);
+        long idx;
+
+        while ((idx = segmentCompressStorage.nextSegmentToCompressOrWait()) <= 
lastTruncatedArchiveIdx())
+            onSegmentCompressed(idx);
+
+        return idx;
     }
 
     /**
@@ -152,14 +155,14 @@ public class SegmentAware {
      * @param lastTruncatedArchiveIdx Last truncated segment;
      */
     public void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) {
-        this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx;
+        
segmentArchivedStorage.lastTruncatedArchiveIdx(lastTruncatedArchiveIdx);
     }
 
     /**
      * @return Last truncated segment.
      */
     public long lastTruncatedArchiveIdx() {
-        return lastTruncatedArchiveIdx;
+        return segmentArchivedStorage.lastTruncatedArchiveIdx();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index 174fb46..95d4f4a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -94,6 +94,9 @@ public class SegmentCompressStorage {
             this.lastCompressedIdx = Math.min(lastMaxCompressedIdx, 
compressingSegments.get(0) - 1);
         else
             this.lastCompressedIdx = lastMaxCompressedIdx;
+
+        if (compressedIdx > lastEnqueuedToCompressIdx)
+            lastEnqueuedToCompressIdx = compressedIdx;
     }
 
     /**
@@ -120,9 +123,11 @@ public class SegmentCompressStorage {
 
         Long idx = segmentsToCompress.poll();
 
+        assert idx != null;
+
         compressingSegments.add(idx);
 
-        return idx == null ? -1L : idx;
+        return idx;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
new file mode 100644
index 0000000..3685fe7
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
@@ -0,0 +1,161 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.WalSegmentCompactedEvent;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
+
+/** */
+public class WalCompactionAfterRestartTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(true)
+                .setMaxSize(200L * 1024 * 1024))
+            .setWalMode(WALMode.LOG_ONLY)
+            .setWalSegmentSize(512 * 1024)
+            .setWalCompactionEnabled(true)
+            .setMaxWalArchiveSize(2 * 512 * 1024)
+        );
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(DEFAULT_CACHE_NAME);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+        ccfg.setBackups(0);
+
+        cfg.setCacheConfiguration(ccfg);
+        cfg.setConsistentId(name);
+
+        cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_COMPACTED);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void test() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        ig.cluster().active(true);
+
+        doCachePuts(ig, 10_000);
+
+        ig.cluster().active(false);
+
+        stopGrid(0);
+
+        IgniteEx ig0 = startGrid(0);
+
+        ig0.cluster().active(true);
+
+        List<IgniteBiTuple<Long, Long>> discrepancies = 
Collections.synchronizedList(new ArrayList<>());
+
+        ig0.events().localListen(e -> {
+            long evtSegIdx = 
((WalSegmentCompactedEvent)e).getAbsWalSegmentIdx();
+            long lastCompactedIdx = 
ig0.context().cache().context().wal().lastCompactedSegment();
+
+            if (lastCompactedIdx < 0 || lastCompactedIdx > evtSegIdx)
+                discrepancies.add(F.t(evtSegIdx, lastCompactedIdx));
+
+            return true;
+        }, EVT_WAL_SEGMENT_COMPACTED);
+
+        doCachePuts(ig0, 5_000);
+
+        stopGrid(0);
+
+        if (!discrepancies.isEmpty()) {
+            fail("Discrepancies (EVT_WAL_SEGMENT_COMPACTED index vs. 
lastCompactedSegment):" + System.lineSeparator() +
+                discrepancies.stream()
+                    .map(t -> String.format("%d <-> %d", t.get1(), t.get2()))
+                    .collect(Collectors.joining(System.lineSeparator())));
+        }
+    }
+
+    /** */
+    private void doCachePuts(IgniteEx ig, long millis) throws 
IgniteCheckedException {
+        IgniteCache<Integer, byte[]> cache = 
ig.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<Long> putFut = 
GridTestUtils.runMultiThreadedAsync(() -> {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            while (!stop.get())
+                cache.put(rnd.nextInt(), "Ignite".getBytes());
+        }, 4, "cache-filler");
+
+        U.sleep(millis);
+
+        stop.set(true);
+
+        putFut.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index 0869356..d651e01 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -438,18 +438,12 @@ public class SegmentAwareTest extends TestCase {
      * Next segment for compress based on truncated archive idx.
      */
     public void testCorrectCalculateNextCompressSegment() throws 
IgniteCheckedException, InterruptedException {
-        //given: thread which awaited segment.
         SegmentAware aware = new SegmentAware(10, true);
 
-        aware.onSegmentCompressed(5);
         aware.setLastArchivedAbsoluteIndex(6);
-        aware.lastTruncatedArchiveIdx(7);
-
-        //when:
-        long segmentToCompress = aware.waitNextSegmentToCompress();
 
-        //then: segment to compress greater than truncated archive idx
-        assertEquals(8, segmentToCompress);
+        for (int exp = 0; exp <= 6; exp++)
+            assertEquals(exp, aware.waitNextSegmentToCompress());
     }
 
     /**

Reply via email to