This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f79ea9aa642 IGNITE-17719 
IgnitePdsThreadInterruptionTest#testInterruptsOnWALWrite hangs (#10417)
f79ea9aa642 is described below

commit f79ea9aa642308b8c9aa0cab2536b8ec35909109
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Dec 2 12:18:23 2022 +0300

    IGNITE-17719 IgnitePdsThreadInterruptionTest#testInterruptsOnWALWrite hangs 
(#10417)
---
 .../persistence/wal/FileWriteAheadLogManager.java  |  30 ++-
 .../wal/aware/SegmentCurrentStateStorage.java      |  10 +-
 .../db/file/IgnitePdsThreadInterruptionTest.java   | 213 +++++++++------------
 3 files changed, 124 insertions(+), 129 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 868871288e6..90a9b30d534 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1708,9 +1708,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         long absNextIdxStartTime = System.nanoTime();
 
-        // Signal to archiver that we are done with the segment and it can be 
archived.
+        // Signal to archiver that we are done with the segment, and it can be 
archived.
         long absNextIdx = archiver0.nextAbsoluteSegmentIndex();
 
+        assert absNextIdx == curIdx + 1 : "curIdx=" + curIdx + ", nextIdx=" + 
absNextIdx;
+
         long absNextIdxWaitTime = U.nanosToMillis(System.nanoTime() - 
absNextIdxStartTime);
 
         if (absNextIdxWaitTime > THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT) {
@@ -2028,7 +2030,31 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 throw cleanErr;
 
             try {
-                long nextIdx = segmentAware.nextAbsoluteSegmentIndex();
+                long nextIdx;
+
+                boolean interrupted = false;
+
+                while (true) {
+                    try {
+                        nextIdx = segmentAware.nextAbsoluteSegmentIndex();
+
+                        break;
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        if (isCancelled.get()) {
+                            // Archiver will soon complete its work, so we can 
not wait for the segment to be archived.
+                            throw e;
+                        }
+                        else {
+                            // It is assumed that the interruption of the 
thread came for example from a user thread,
+                            // which should not interrupt write to the WAL, so 
we should just try again.
+                            interrupted = true;
+                        }
+                    }
+                }
+
+                if (interrupted)
+                    Thread.currentThread().interrupt();
 
                 synchronized (this) {
                     // Wait for formatter so that we do not open an empty file 
in DEFAULT mode.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
index 6672879b1a6..1883d2ae3d4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
@@ -88,12 +88,8 @@ class SegmentCurrentStateStorage extends SegmentObservable {
         long nextAbsIdx;
 
         synchronized (this) {
-            curAbsWalIdx++;
-
-            notifyAll();
-
             try {
-                while (curAbsWalIdx - lastAbsArchivedIdx > walSegmentsCnt && 
!forceInterrupted)
+                while ((curAbsWalIdx + 1) - lastAbsArchivedIdx > 
walSegmentsCnt && !forceInterrupted)
                     wait();
             }
             catch (InterruptedException e) {
@@ -103,7 +99,9 @@ class SegmentCurrentStateStorage extends SegmentObservable {
             if (forceInterrupted)
                 throw new IgniteInterruptedCheckedException("Interrupt waiting 
of change archived idx");
 
-            nextAbsIdx = curAbsWalIdx;
+            nextAbsIdx = ++curAbsWalIdx;
+
+            notifyAll();
         }
 
         notifyObservers(nextAbsIdx);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
index 076db8fdeda..e36a35d0f90 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.db.file;
 
-import java.util.HashSet;
-import java.util.Map;
+import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
@@ -32,17 +34,20 @@ import 
org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.util.IgniteUtils.MB;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThat;
+
 /**
  * Test what interruptions of writing threads do not affect PDS.
  */
@@ -50,15 +55,6 @@ public class IgnitePdsThreadInterruptionTest extends 
GridCommonAbstractTest {
     /** */
     public static final int THREADS_CNT = 100;
 
-    /** */
-    private static final int VAL_LEN = 8192;
-
-    /** */
-    private static final byte[] PAYLOAD = new byte[VAL_LEN];
-
-    /** */
-    private volatile boolean stop;
-
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -66,13 +62,14 @@ public class IgnitePdsThreadInterruptionTest extends 
GridCommonAbstractTest {
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setWalMode(WALMode.LOG_ONLY)
             .setWalFsyncDelayNanos(0)
-            .setWalSegmentSize(1024 * 1024)
+            .setWalSegmentSize((int)MB)
+            .setWalSegments(2)
             .setFileIOFactory(new AsyncFileIOFactory())
             .setDefaultDataRegionConfiguration(
                 new DataRegionConfiguration()
                     .setPersistenceEnabled(true)
-                    .setInitialSize(10L * 1024L * 1024L)
-                    .setMaxSize(10L * 1024L * 1024L)
+                    .setInitialSize(10L * MB)
+                    .setMaxSize(10L * MB)
             ));
 
         cfg.setCacheConfiguration(
@@ -83,20 +80,29 @@ public class IgnitePdsThreadInterruptionTest extends 
GridCommonAbstractTest {
         return cfg;
     }
 
-    /** */
-    @Before
-    public void before() throws Exception {
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
         cleanPersistenceDir();
     }
 
-    /** */
-    @After
-    public void after() throws Exception {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
         stopAllGrids();
 
         cleanPersistenceDir();
     }
 
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String 
igniteInstanceName) {
+        return new StopNodeFailureHandler();
+    }
+
     /**
      * Tests interruptions on read.
      *
@@ -104,84 +110,67 @@ public class IgnitePdsThreadInterruptionTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testInterruptsOnRead() throws Exception {
-        Ignite ignite = startGrid();
+        Ignite ignite = startGrid(0);
 
-        ignite.cluster().active(true);
+        ignite.cluster().state(ACTIVE);
 
-        int maxKey = 10_000;
+        int keyCount = 10_000;
 
-        Set<Integer> keysToCheck = new HashSet<>();
-
-        Thread[] workers = new Thread[THREADS_CNT];
+        byte[] value = new byte[8_192];
 
         // Load data.
         try (IgniteDataStreamer<Integer, byte[]> st = 
ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
             st.allowOverwrite(true);
 
-            for (int i = 0; i < maxKey; i++) {
-                keysToCheck.add(i);
-
-                st.addData(i, PAYLOAD);
-            }
+            for (int i = 0; i < keyCount; i++)
+                st.addData(i, value);
         }
 
         IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-        AtomicReference<Throwable> fail = new AtomicReference<>();
+        Collection<Throwable> readThreadsError = new ConcurrentLinkedQueue<>();
 
-        for (int i = 0; i < workers.length; i++) {
-            workers[i] = new Thread(() -> 
cache.get(ThreadLocalRandom.current().nextInt(maxKey / 5)));
+        CountDownLatch startThreadsLatch = new CountDownLatch(THREADS_CNT);
 
-            workers[i].setName("reader-" + i);
+        Thread[] workers = new Thread[THREADS_CNT];
 
-            workers[i].setUncaughtExceptionHandler((t, e) -> {
-                // We can get IgniteInterruptedException on 
GridCacheAdapter.asyncOpsSem if thread was interrupted
-                // before asyncOpsSem.acquire().
-                if (!X.hasCause(e,
-                    "Failed to wait for asynchronous operation permit",
-                    IgniteInterruptedException.class)) {
-                    fail.compareAndSet(null, e);
-                }
-            });
+        for (int i = 0; i < workers.length; i++) {
+            workers[i] = new Thread(
+                () -> {
+                    try {
+                        startThreadsLatch.countDown();
+
+                        cache.get(ThreadLocalRandom.current().nextInt(keyCount 
/ 5));
+                    }
+                    catch (Throwable throwable) {
+                        if (!X.hasCause(
+                            throwable,
+                            "Failed to wait for asynchronous operation permit",
+                            IgniteInterruptedException.class
+                        ))
+                            readThreadsError.add(throwable);
+                    }
+                },
+                "cache-reader-from-test" + i
+            );
         }
 
         for (Thread worker : workers)
             worker.start();
 
-        // Interrupts should not affect reads.
-        for (int i = 0; i < workers.length / 2; i++)
-            workers[i].interrupt();
-
-        U.sleep(3_000);
+        assertTrue(startThreadsLatch.await(3, TimeUnit.SECONDS));
 
+        // Interrupts should not affect reads.
         for (Thread worker : workers)
-            worker.join();
-
-        Throwable t = fail.get();
-
-        assertNull(t);
-
-        int verifiedKeys = 0;
-
-        // Get all keys.
-        Map<Integer, byte[]> res = cache.getAll(keysToCheck);
-
-        Assert.assertEquals(maxKey, keysToCheck.size());
-        Assert.assertEquals(maxKey, res.size());
-
-        // Post check.
-        for (Integer key: keysToCheck) {
-            byte[] val = res.get(key);
-
-            assertNotNull(val);
-            assertEquals("Illegal length", VAL_LEN, val.length);
+            worker.interrupt();
 
-            verifiedKeys++;
-        }
+        for (Thread worker : workers)
+            worker.join(TimeUnit.SECONDS.toMillis(1));
 
-        Assert.assertEquals(maxKey, verifiedKeys);
+        assertThat(readThreadsError, empty());
 
-        log.info("Verified keys: " + verifiedKeys);
+        for (int i = 0; i < keyCount; i++)
+            assertArrayEquals(String.valueOf(i), cache.get(i), value);
     }
 
     /**
@@ -190,40 +179,40 @@ public class IgnitePdsThreadInterruptionTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     @Test
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-17719";)
     public void testInterruptsOnWALWrite() throws Exception {
-        Ignite ignite = startGrid();
+        Ignite ignite = startGrid(0);
 
-        ignite.cluster().active(true);
+        ignite.cluster().state(ACTIVE);
 
-        int maxKey = 100_000;
+        IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
         Set<Integer> keysToCheck = new GridConcurrentHashSet<>();
 
-        Thread[] workers = new Thread[THREADS_CNT];
+        Collection<Throwable> writeThreadsError = new 
ConcurrentLinkedQueue<>();
 
-        AtomicReference<Throwable> fail = new AtomicReference<>();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        byte[] value = new byte[8_192];
+
+        Thread[] workers = new Thread[THREADS_CNT];
 
         for (int i = 0; i < workers.length; i++) {
             workers[i] = new Thread(() -> {
-                IgniteCache<Object, Object> cache = 
ignite.cache(DEFAULT_CACHE_NAME);
+                try {
+                    while (!stop.get()) {
+                        int key = ThreadLocalRandom.current().nextInt(100_000);
 
-                while (!stop) {
-                    int key = ThreadLocalRandom.current().nextInt(maxKey);
+                        cache.put(key, value);
 
-                    cache.put(key, PAYLOAD);
-
-                    keysToCheck.add(key);
+                        keysToCheck.add(key);
+                    }
+                }
+                catch (Throwable throwable) {
+                    writeThreadsError.add(throwable);
                 }
-            });
+            }, "cache-writer-from-test" + i);
 
             workers[i].setName("writer-" + i);
-
-            workers[i].setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
-                @Override public void uncaughtException(Thread t, Throwable e) 
{
-                    fail.compareAndSet(null, e);
-                }
-            });
         }
 
         for (Thread worker : workers)
@@ -237,33 +226,15 @@ public class IgnitePdsThreadInterruptionTest extends 
GridCommonAbstractTest {
 
         Thread.sleep(3_000);
 
-        stop = true;
+        stop.set(true);
 
         for (Thread worker : workers)
-            worker.join();
-
-        Throwable t = fail.get();
-
-        assertNull(t);
-
-        IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
-
-        int verifiedKeys = 0;
+            worker.join(TimeUnit.SECONDS.toMillis(1));
 
-        Map<Integer, byte[]> res = cache.getAll(keysToCheck);
-
-        Assert.assertEquals(res.size(), keysToCheck.size());
+        assertThat(writeThreadsError, empty());
 
         // Post check.
-        for (Integer key: keysToCheck) {
-            byte[] val = res.get(key);
-
-            assertNotNull(val);
-            assertEquals("Illegal length", VAL_LEN, val.length);
-
-            verifiedKeys++;
-        }
-
-        log.info("Verified keys: " + verifiedKeys);
+        for (Integer key: keysToCheck)
+            assertArrayEquals(String.valueOf(key), value, cache.get(key));
     }
 }

Reply via email to