This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f79ea9aa642 IGNITE-17719
IgnitePdsThreadInterruptionTest#testInterruptsOnWALWrite hangs (#10417)
f79ea9aa642 is described below
commit f79ea9aa642308b8c9aa0cab2536b8ec35909109
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Dec 2 12:18:23 2022 +0300
IGNITE-17719 IgnitePdsThreadInterruptionTest#testInterruptsOnWALWrite hangs
(#10417)
---
.../persistence/wal/FileWriteAheadLogManager.java | 30 ++-
.../wal/aware/SegmentCurrentStateStorage.java | 10 +-
.../db/file/IgnitePdsThreadInterruptionTest.java | 213 +++++++++------------
3 files changed, 124 insertions(+), 129 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 868871288e6..90a9b30d534 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1708,9 +1708,11 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
long absNextIdxStartTime = System.nanoTime();
- // Signal to archiver that we are done with the segment and it can be
archived.
+ // Signal to archiver that we are done with the segment, and it can be
archived.
long absNextIdx = archiver0.nextAbsoluteSegmentIndex();
+ assert absNextIdx == curIdx + 1 : "curIdx=" + curIdx + ", nextIdx=" +
absNextIdx;
+
long absNextIdxWaitTime = U.nanosToMillis(System.nanoTime() -
absNextIdxStartTime);
if (absNextIdxWaitTime > THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT) {
@@ -2028,7 +2030,31 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
throw cleanErr;
try {
- long nextIdx = segmentAware.nextAbsoluteSegmentIndex();
+ long nextIdx;
+
+ boolean interrupted = false;
+
+ while (true) {
+ try {
+ nextIdx = segmentAware.nextAbsoluteSegmentIndex();
+
+ break;
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ if (isCancelled.get()) {
+ // Archiver will soon complete its work, so we can
not wait for the segment to be archived.
+ throw e;
+ }
+ else {
+ // It is assumed that the interruption of the
thread came for example from a user thread,
+ // which should not interrupt write to the WAL, so
we should just try again.
+ interrupted = true;
+ }
+ }
+ }
+
+ if (interrupted)
+ Thread.currentThread().interrupt();
synchronized (this) {
// Wait for formatter so that we do not open an empty file
in DEFAULT mode.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
index 6672879b1a6..1883d2ae3d4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
@@ -88,12 +88,8 @@ class SegmentCurrentStateStorage extends SegmentObservable {
long nextAbsIdx;
synchronized (this) {
- curAbsWalIdx++;
-
- notifyAll();
-
try {
- while (curAbsWalIdx - lastAbsArchivedIdx > walSegmentsCnt &&
!forceInterrupted)
+ while ((curAbsWalIdx + 1) - lastAbsArchivedIdx >
walSegmentsCnt && !forceInterrupted)
wait();
}
catch (InterruptedException e) {
@@ -103,7 +99,9 @@ class SegmentCurrentStateStorage extends SegmentObservable {
if (forceInterrupted)
throw new IgniteInterruptedCheckedException("Interrupt waiting
of change archived idx");
- nextAbsIdx = curAbsWalIdx;
+ nextAbsIdx = ++curAbsWalIdx;
+
+ notifyAll();
}
notifyObservers(nextAbsIdx);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
index 076db8fdeda..e36a35d0f90 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.processors.cache.persistence.db.file;
-import java.util.HashSet;
-import java.util.Map;
+import java.util.Collection;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
@@ -32,17 +34,20 @@ import
org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
import
org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.util.IgniteUtils.MB;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThat;
+
/**
* Test what interruptions of writing threads do not affect PDS.
*/
@@ -50,15 +55,6 @@ public class IgnitePdsThreadInterruptionTest extends
GridCommonAbstractTest {
/** */
public static final int THREADS_CNT = 100;
- /** */
- private static final int VAL_LEN = 8192;
-
- /** */
- private static final byte[] PAYLOAD = new byte[VAL_LEN];
-
- /** */
- private volatile boolean stop;
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName)
throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -66,13 +62,14 @@ public class IgnitePdsThreadInterruptionTest extends
GridCommonAbstractTest {
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalMode(WALMode.LOG_ONLY)
.setWalFsyncDelayNanos(0)
- .setWalSegmentSize(1024 * 1024)
+ .setWalSegmentSize((int)MB)
+ .setWalSegments(2)
.setFileIOFactory(new AsyncFileIOFactory())
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
- .setInitialSize(10L * 1024L * 1024L)
- .setMaxSize(10L * 1024L * 1024L)
+ .setInitialSize(10L * MB)
+ .setMaxSize(10L * MB)
));
cfg.setCacheConfiguration(
@@ -83,20 +80,29 @@ public class IgnitePdsThreadInterruptionTest extends
GridCommonAbstractTest {
return cfg;
}
- /** */
- @Before
- public void before() throws Exception {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
cleanPersistenceDir();
}
- /** */
- @After
- public void after() throws Exception {
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
stopAllGrids();
cleanPersistenceDir();
}
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String
igniteInstanceName) {
+ return new StopNodeFailureHandler();
+ }
+
/**
* Tests interruptions on read.
*
@@ -104,84 +110,67 @@ public class IgnitePdsThreadInterruptionTest extends
GridCommonAbstractTest {
*/
@Test
public void testInterruptsOnRead() throws Exception {
- Ignite ignite = startGrid();
+ Ignite ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
- int maxKey = 10_000;
+ int keyCount = 10_000;
- Set<Integer> keysToCheck = new HashSet<>();
-
- Thread[] workers = new Thread[THREADS_CNT];
+ byte[] value = new byte[8_192];
// Load data.
try (IgniteDataStreamer<Integer, byte[]> st =
ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
st.allowOverwrite(true);
- for (int i = 0; i < maxKey; i++) {
- keysToCheck.add(i);
-
- st.addData(i, PAYLOAD);
- }
+ for (int i = 0; i < keyCount; i++)
+ st.addData(i, value);
}
IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
- AtomicReference<Throwable> fail = new AtomicReference<>();
+ Collection<Throwable> readThreadsError = new ConcurrentLinkedQueue<>();
- for (int i = 0; i < workers.length; i++) {
- workers[i] = new Thread(() ->
cache.get(ThreadLocalRandom.current().nextInt(maxKey / 5)));
+ CountDownLatch startThreadsLatch = new CountDownLatch(THREADS_CNT);
- workers[i].setName("reader-" + i);
+ Thread[] workers = new Thread[THREADS_CNT];
- workers[i].setUncaughtExceptionHandler((t, e) -> {
- // We can get IgniteInterruptedException on
GridCacheAdapter.asyncOpsSem if thread was interrupted
- // before asyncOpsSem.acquire().
- if (!X.hasCause(e,
- "Failed to wait for asynchronous operation permit",
- IgniteInterruptedException.class)) {
- fail.compareAndSet(null, e);
- }
- });
+ for (int i = 0; i < workers.length; i++) {
+ workers[i] = new Thread(
+ () -> {
+ try {
+ startThreadsLatch.countDown();
+
+ cache.get(ThreadLocalRandom.current().nextInt(keyCount
/ 5));
+ }
+ catch (Throwable throwable) {
+ if (!X.hasCause(
+ throwable,
+ "Failed to wait for asynchronous operation permit",
+ IgniteInterruptedException.class
+ ))
+ readThreadsError.add(throwable);
+ }
+ },
+ "cache-reader-from-test" + i
+ );
}
for (Thread worker : workers)
worker.start();
- // Interrupts should not affect reads.
- for (int i = 0; i < workers.length / 2; i++)
- workers[i].interrupt();
-
- U.sleep(3_000);
+ assertTrue(startThreadsLatch.await(3, TimeUnit.SECONDS));
+ // Interrupts should not affect reads.
for (Thread worker : workers)
- worker.join();
-
- Throwable t = fail.get();
-
- assertNull(t);
-
- int verifiedKeys = 0;
-
- // Get all keys.
- Map<Integer, byte[]> res = cache.getAll(keysToCheck);
-
- Assert.assertEquals(maxKey, keysToCheck.size());
- Assert.assertEquals(maxKey, res.size());
-
- // Post check.
- for (Integer key: keysToCheck) {
- byte[] val = res.get(key);
-
- assertNotNull(val);
- assertEquals("Illegal length", VAL_LEN, val.length);
+ worker.interrupt();
- verifiedKeys++;
- }
+ for (Thread worker : workers)
+ worker.join(TimeUnit.SECONDS.toMillis(1));
- Assert.assertEquals(maxKey, verifiedKeys);
+ assertThat(readThreadsError, empty());
- log.info("Verified keys: " + verifiedKeys);
+ for (int i = 0; i < keyCount; i++)
+ assertArrayEquals(String.valueOf(i), cache.get(i), value);
}
/**
@@ -190,40 +179,40 @@ public class IgnitePdsThreadInterruptionTest extends
GridCommonAbstractTest {
* @throws Exception If failed.
*/
@Test
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-17719")
public void testInterruptsOnWALWrite() throws Exception {
- Ignite ignite = startGrid();
+ Ignite ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
- int maxKey = 100_000;
+ IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
Set<Integer> keysToCheck = new GridConcurrentHashSet<>();
- Thread[] workers = new Thread[THREADS_CNT];
+ Collection<Throwable> writeThreadsError = new
ConcurrentLinkedQueue<>();
- AtomicReference<Throwable> fail = new AtomicReference<>();
+ AtomicBoolean stop = new AtomicBoolean();
+
+ byte[] value = new byte[8_192];
+
+ Thread[] workers = new Thread[THREADS_CNT];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Thread(() -> {
- IgniteCache<Object, Object> cache =
ignite.cache(DEFAULT_CACHE_NAME);
+ try {
+ while (!stop.get()) {
+ int key = ThreadLocalRandom.current().nextInt(100_000);
- while (!stop) {
- int key = ThreadLocalRandom.current().nextInt(maxKey);
+ cache.put(key, value);
- cache.put(key, PAYLOAD);
-
- keysToCheck.add(key);
+ keysToCheck.add(key);
+ }
+ }
+ catch (Throwable throwable) {
+ writeThreadsError.add(throwable);
}
- });
+ }, "cache-writer-from-test" + i);
workers[i].setName("writer-" + i);
-
- workers[i].setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
- @Override public void uncaughtException(Thread t, Throwable e)
{
- fail.compareAndSet(null, e);
- }
- });
}
for (Thread worker : workers)
@@ -237,33 +226,15 @@ public class IgnitePdsThreadInterruptionTest extends
GridCommonAbstractTest {
Thread.sleep(3_000);
- stop = true;
+ stop.set(true);
for (Thread worker : workers)
- worker.join();
-
- Throwable t = fail.get();
-
- assertNull(t);
-
- IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
-
- int verifiedKeys = 0;
+ worker.join(TimeUnit.SECONDS.toMillis(1));
- Map<Integer, byte[]> res = cache.getAll(keysToCheck);
-
- Assert.assertEquals(res.size(), keysToCheck.size());
+ assertThat(writeThreadsError, empty());
// Post check.
- for (Integer key: keysToCheck) {
- byte[] val = res.get(key);
-
- assertNotNull(val);
- assertEquals("Illegal length", VAL_LEN, val.length);
-
- verifiedKeys++;
- }
-
- log.info("Verified keys: " + verifiedKeys);
+ for (Integer key: keysToCheck)
+ assertArrayEquals(String.valueOf(key), value, cache.get(key));
}
}