IGNITE-9761 Fixed deadlock in WAL manager - Fixes #4890. Signed-off-by: Alexey Goncharuk <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd07c835 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd07c835 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd07c835 Branch: refs/heads/ignite-5797 Commit: bd07c83583807714757ed033901bf885c9a77b24 Parents: 4572137 Author: Anton Kalashnikov <[email protected]> Authored: Tue Oct 2 18:26:20 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Oct 2 18:26:20 2018 +0300 ---------------------------------------------------------------------- .../wal/aware/SegmentArchivedStorage.java | 8 ++-- .../persistence/wal/aware/SegmentAware.java | 9 +++++ .../wal/aware/SegmentLockStorage.java | 27 +++++-------- .../wal/aware/SegmentObservable.java | 10 ++--- .../persistence/wal/aware/SegmentAwareTest.java | 42 +++++++++++++++++++- 5 files changed, 70 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bd07c835/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java index 1ed607e..c526ae1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java @@ -63,10 +63,12 @@ class SegmentArchivedStorage extends SegmentObservable { /** * @param lastAbsArchivedIdx New value of last archived segment index. */ - synchronized void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) { - this.lastAbsArchivedIdx = lastAbsArchivedIdx; + void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) { + synchronized (this) { + this.lastAbsArchivedIdx = lastAbsArchivedIdx; - notifyAll(); + notifyAll(); + } notifyObservers(lastAbsArchivedIdx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd07c835/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java index 6ba0399..e46d93f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java @@ -220,6 +220,15 @@ public class SegmentAware { } /** + * Visible for test. + * + * @param absIdx Segment absolute index. segment later, use {@link #releaseWorkSegment} for unlock</li> </ul> + */ + void lockWorkSegment(long absIdx) { + segmentLockStorage.lockWorkSegment(absIdx); + } + + /** * @param absIdx Segment absolute index. */ public void releaseWorkSegment(long absIdx) { http://git-wip-us.apache.org/repos/asf/ignite/blob/bd07c835/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java index 2e145e7..f638d4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.aware; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; /** @@ -29,7 +29,7 @@ public class SegmentLockStorage extends SegmentObservable { * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may come from * {@link FileWriteAheadLogManager.RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>. */ - private Map<Long, Integer> locked = new HashMap<>(); + private Map<Long, Integer> locked = new ConcurrentHashMap<>(); /** * Check if WAL segment locked (protected from move to archive) @@ -37,7 +37,7 @@ public class SegmentLockStorage extends SegmentObservable { * @param absIdx Index for check reservation. * @return {@code True} if index is locked. */ - public synchronized boolean locked(long absIdx) { + public boolean locked(long absIdx) { return locked.containsKey(absIdx); } @@ -47,12 +47,8 @@ public class SegmentLockStorage extends SegmentObservable { * segment later, use {@link #releaseWorkSegment} for unlock</li> </ul> */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - synchronized boolean lockWorkSegment(long absIdx) { - Integer cur = locked.get(absIdx); - - cur = cur == null ? 1 : cur + 1; - - locked.put(absIdx, cur); + boolean lockWorkSegment(long absIdx) { + locked.compute(absIdx, (idx, count) -> count == null ? 1 : count + 1); return false; } @@ -61,15 +57,12 @@ public class SegmentLockStorage extends SegmentObservable { * @param absIdx Segment absolute index. */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - synchronized void releaseWorkSegment(long absIdx) { - Integer cur = locked.get(absIdx); - - assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx; + void releaseWorkSegment(long absIdx) { + locked.compute(absIdx, (idx, count) -> { + assert count != null && count >= 1 : "cur=" + count + ", absIdx=" + absIdx; - if (cur == 1) - locked.remove(absIdx); - else - locked.put(absIdx, cur - 1); + return count == 1 ? null : count - 1; + }); notifyObservers(absIdx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd07c835/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java index ba5ad30..3e91504 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.aware; -import java.util.ArrayList; -import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; /** @@ -26,12 +26,12 @@ import java.util.function.Consumer; */ public abstract class SegmentObservable { /** Observers for handle changes of archived index. */ - private final List<Consumer<Long>> observers = new ArrayList<>(); + private final Queue<Consumer<Long>> observers = new ConcurrentLinkedQueue<>(); /** * @param observer Observer for notification about segment's changes. */ - synchronized void addObserver(Consumer<Long> observer) { + void addObserver(Consumer<Long> observer) { observers.add(observer); } @@ -40,7 +40,7 @@ public abstract class SegmentObservable { * * @param segmentId Segment which was been changed. */ - synchronized void notifyObservers(long segmentId) { + void notifyObservers(long segmentId) { observers.forEach(observer -> observer.accept(segmentId)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd07c835/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java index 7840b09..0869356 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java @@ -31,6 +31,46 @@ import static org.junit.Assert.assertThat; * Test for {@link SegmentAware}. */ public class SegmentAwareTest extends TestCase { + + /** + * Checking to avoid deadlock SegmentArchivedStorage.markAsMovedToArchive -> SegmentLockStorage.locked <-> + * SegmentLockStorage.releaseWorkSegment -> SegmentArchivedStorage.onSegmentUnlocked + * + * @throws IgniteCheckedException if failed. + */ + public void testAvoidDeadlockArchiverAndLockStorage() throws IgniteCheckedException { + SegmentAware aware = new SegmentAware(10, false); + + int iterationCnt = 100_000; + int segmentToHandle = 1; + + IgniteInternalFuture archiverThread = GridTestUtils.runAsync(() -> { + int i = iterationCnt; + + while (i-- > 0) { + try { + aware.markAsMovedToArchive(segmentToHandle); + } + catch (IgniteInterruptedCheckedException e) { + throw new RuntimeException(e); + } + } + }); + + IgniteInternalFuture lockerThread = GridTestUtils.runAsync(() -> { + int i = iterationCnt; + + while (i-- > 0) { + aware.lockWorkSegment(segmentToHandle); + + aware.releaseWorkSegment(segmentToHandle); + } + }); + + archiverThread.get(); + lockerThread.get(); + } + /** * Waiting finished when work segment is set. */ @@ -435,7 +475,7 @@ public class SegmentAwareTest extends TestCase { public void testLastCompressedIdxProperOrdering() throws IgniteInterruptedCheckedException { SegmentAware aware = new SegmentAware(10, true); - for (int i = 0; i < 5 ; i++) { + for (int i = 0; i < 5; i++) { aware.setLastArchivedAbsoluteIndex(i); aware.waitNextSegmentToCompress(); }
