This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 31b132e IGNITE-10891 Fix
IgnitePdsThreadInterruptionTest.testInterruptsOnLFSRead flaky in PDS indexing -
Fixes #5810.
31b132e is described below
commit 31b132e936c5413c8677be2da8800e16ba8c7576
Author: Dmitriy Govorukhin <[email protected]>
AuthorDate: Tue Dec 25 14:01:50 2018 +0300
IGNITE-10891 Fix IgnitePdsThreadInterruptionTest.testInterruptsOnLFSRead
flaky in PDS indexing - Fixes #5810.
---
.../pagemem/DelayedPageReplacementTracker.java | 8 +-
.../org/apache/ignite/internal/util/typedef/X.java | 28 ++-
.../db/file/IgnitePdsThreadInterruptionTest.java | 206 +++++++++++----------
.../apache/ignite/util/GridCommandHandlerTest.java | 2 +
4 files changed, 138 insertions(+), 106 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
index aa1b061..c04073e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
@@ -163,17 +163,23 @@ public class DelayedPageReplacementTracker {
if (!hasLockedPages)
return;
+ boolean interrupted = false;
+
while (locked.contains(id)) {
if (log.isDebugEnabled())
log.debug("Found replaced page [" + id + "] which is
being written to page store, wait for finish replacement");
try {
+ // Uninterruptable wait.
locked.wait();
}
catch (InterruptedException e) {
- throw new IgniteInterruptedException(e);
+ interrupted = true;
}
}
+
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
index 59371e3..7c5fd81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
@@ -460,12 +460,13 @@ public final class X {
* into check.
*
* @param t Throwable to check (if {@code null}, {@code false} is
returned).
+ * @param msg Message text that should be in cause.
* @param cls Cause classes to check (if {@code null} or empty, {@code
false} is returned).
* @return {@code True} if one of the causing exception is an instance of
passed in classes,
* {@code false} otherwise.
*/
@SafeVarargs
- public static boolean hasCause(@Nullable Throwable t, @Nullable
Class<?>... cls) {
+ public static boolean hasCause(@Nullable Throwable t, @Nullable String
msg, @Nullable Class<?>... cls) {
if (t == null || F.isEmpty(cls))
return false;
@@ -473,12 +474,20 @@ public final class X {
for (Throwable th = t; th != null; th = th.getCause()) {
for (Class<?> c : cls) {
- if (c.isAssignableFrom(th.getClass()))
+ if (c.isAssignableFrom(th.getClass())) {
+ if (msg != null) {
+ if (th.getMessage() != null &&
th.getMessage().contains(msg))
+ return true;
+ else
+ continue;
+ }
+
return true;
+ }
}
for (Throwable n : th.getSuppressed()) {
- if (hasCause(n, cls))
+ if (hasCause(n, msg, cls))
return true;
}
@@ -490,6 +499,19 @@ public final class X {
}
/**
+ * Checks if passed in {@code 'Throwable'} has given class in {@code
'cause'} hierarchy <b>including</b> that
+ * throwable itself. <p> Note that this method follows includes {@link
Throwable#getSuppressed()} into check.
+ *
+ * @param t Throwable to check (if {@code null}, {@code false} is
returned).
+ * @param cls Cause classes to check (if {@code null} or empty, {@code
false} is returned).
+ * @return {@code True} if one of the causing exception is an instance of
passed in classes, {@code false}
+ * otherwise.
+ */
+ public static boolean hasCause(@Nullable Throwable t, @Nullable
Class<?>... cls) {
+ return hasCause(t, null, cls);
+ }
+
+ /**
* Checks if passed throwable has given class in one of the suppressed
exceptions.
*
* @param t Throwable to check (if {@code null}, {@code false} is
returned).
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
index 4b7db7d..9f7f791 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
@@ -17,19 +17,29 @@
package org.apache.ignite.internal.processors.cache.persistence.db.file;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import
org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -40,110 +50,95 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
/** */
- private static final int PAGE_SIZE = 1 << 12; // 4096
+ public static final int THREADS_CNT = 100;
/** */
- public static final int THREADS_CNT = 100;
+ private static final int VAL_LEN = 8192;
- /**
- * Cache name.
- */
- private final String CACHE_NAME = "cache";
+ /** */
+ private static final byte[] PAYLOAD = new byte[VAL_LEN];
/** */
- private volatile boolean stop = false;
+ private volatile boolean stop;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName)
throws Exception {
- final IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setDataStorageConfiguration(storageConfiguration());
-
- CacheConfiguration ccfg = new CacheConfiguration<>(CACHE_NAME);
-
- RendezvousAffinityFunction affinityFunction = new
RendezvousAffinityFunction();
- affinityFunction.setPartitions(1);
-
- ccfg.setAffinity(affinityFunction);
-
- cfg.setCacheConfiguration(ccfg);
-
- return cfg;
- }
-
- /**
- * @return DataStorage configuration.
- */
- private DataStorageConfiguration storageConfiguration() {
- DataRegionConfiguration regionCfg = new DataRegionConfiguration()
- .setInitialSize(10L * 1024L * 1024L)
- .setMaxSize(10L * 1024L * 1024L)
- .setPageEvictionMode(DataPageEvictionMode.RANDOM_LRU);
-
- DataStorageConfiguration cfg = new DataStorageConfiguration()
- .setWalMode(WALMode.LOG_ONLY)
- .setWalFsyncDelayNanos(0)
- .setPageSize(PAGE_SIZE)
- .setFileIOFactory(new AsyncFileIOFactory());
-
- cfg.setDefaultDataRegionConfiguration(regionCfg);
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setWalMode(WALMode.LOG_ONLY)
+ .setWalFsyncDelayNanos(0)
+ .setFileIOFactory(new AsyncFileIOFactory())
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setInitialSize(10L * 1024L * 1024L)
+ .setMaxSize(10L * 1024L * 1024L)
+ ));
+
+ cfg.setCacheConfiguration(
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 1))
+ );
return cfg;
}
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTestsStarted();
-
+ @Before
+ public void before() throws Exception {
cleanPersistenceDir();
}
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
+ @After
+ public void after() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/**
- * Tests interruptions on LFS read.
+ * Tests interruptions on read.
*
* @throws Exception If failed.
*/
@Test
- public void testInterruptsOnLFSRead() throws Exception {
- final Ignite ignite = startGrid();
-
- ignite.active(true);
+ public void testInterruptsOnRead() throws Exception {
+ Ignite ignite = startGrid();
- final int valLen = 8192;
+ ignite.cluster().active(true);
- final byte[] payload = new byte[valLen];
+ int maxKey = 10_000;
- final int maxKey = 10_000;
+ Set<Integer> keysToCheck = new HashSet<>();
Thread[] workers = new Thread[THREADS_CNT];
+ // Load data.
+ try (IgniteDataStreamer<Integer, byte[]> st =
ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
+ st.allowOverwrite(true);
- final IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
-
- for (int i=0; i < maxKey; i++)
- cache.put(i, payload);
+ for (int i = 0; i < maxKey; i++){
+ keysToCheck.add(i);
- final AtomicReference<Throwable> fail = new AtomicReference<>();
+ st.addData(i, PAYLOAD);
+ }
+ }
+ IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
- Runnable clo = new Runnable() {
- @Override public void run() {
- cache.get(ThreadLocalRandom.current().nextInt(maxKey / 5));
- }
- };
+ AtomicReference<Throwable> fail = new AtomicReference<>();
for (int i = 0; i < workers.length; i++) {
- workers[i] = new Thread(clo);
+ workers[i] = new Thread(() ->
cache.get(ThreadLocalRandom.current().nextInt(maxKey / 5)));
+
workers[i].setName("reader-" + i);
- workers[i].setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
- @Override public void uncaughtException(Thread t, Throwable e)
{
+
+ workers[i].setUncaughtExceptionHandler((t, e) -> {
+ // We can get IgniteInterruptedException on
GridCacheAdapter.asyncOpsSem if thread was interrupted
+ // before asyncOpsSem.acquire().
+ if (!X.hasCause(e,
+ "Failed to wait for asynchronous operation permit",
+ IgniteInterruptedException.class)) {
fail.compareAndSet(null, e);
}
});
@@ -152,15 +147,11 @@ public class IgnitePdsThreadInterruptionTest extends
GridCommonAbstractTest {
for (Thread worker : workers)
worker.start();
- //Thread.sleep(3_000);
-
// Interrupts should not affect reads.
for (int i = 0;i < workers.length / 2; i++)
workers[i].interrupt();
- Thread.sleep(3_000);
-
- stop = true;
+ U.sleep(3_000);
for (Thread worker : workers)
worker.join();
@@ -171,53 +162,61 @@ public class IgnitePdsThreadInterruptionTest extends
GridCommonAbstractTest {
int verifiedKeys = 0;
+ // Get all keys.
+ Map<Integer, byte[]> res = cache.getAll(keysToCheck);
+
+ Assert.assertEquals(maxKey, keysToCheck.size());
+ Assert.assertEquals(maxKey, res.size());
+
// Post check.
- for (int i = 0; i < maxKey; i++) {
- byte[] val = (byte[]) cache.get(i);
+ for (Integer key: keysToCheck) {
+ byte[] val = res.get(key);
- if (val != null) {
- assertEquals("Illegal length", valLen, val.length);
+ assertNotNull(val);
+ assertEquals("Illegal length", VAL_LEN, val.length);
- verifiedKeys++;
- }
+ verifiedKeys++;
}
+ Assert.assertEquals(maxKey, verifiedKeys);
+
log.info("Verified keys: " + verifiedKeys);
}
/**
* Tests interruptions on WAL write.
*
- * @throws Exception
+ * @throws Exception If failed.
*/
@Test
public void testInterruptsOnWALWrite() throws Exception {
- final Ignite ignite = startGrid();
+ Ignite ignite = startGrid();
- ignite.active(true);
+ ignite.cluster().active(true);
- final int valLen = 8192;
+ int maxKey = 100_000;
- final byte[] payload = new byte[valLen];
-
- final int maxKey = 100_000;
+ Set<Integer> keysToCheck = new GridConcurrentHashSet<>();
Thread[] workers = new Thread[THREADS_CNT];
- final AtomicReference<Throwable> fail = new AtomicReference<>();
+ AtomicReference<Throwable> fail = new AtomicReference<>();
- Runnable clo = new Runnable() {
- @Override public void run() {
- IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
+ for (int i = 0; i < workers.length; i++) {
+ workers[i] = new Thread(() -> {
+ IgniteCache<Object, Object> cache =
ignite.cache(DEFAULT_CACHE_NAME);
- while (!stop)
- cache.put(ThreadLocalRandom.current().nextInt(maxKey),
payload);
- }
- };
+ while (!stop) {
+ int key = ThreadLocalRandom.current().nextInt(maxKey);
+
+ cache.put(key, PAYLOAD);
+
+ keysToCheck.add(key);
+ }
+ });
- for (int i = 0; i < workers.length; i++) {
- workers[i] = new Thread(clo);
workers[i].setName("writer-" + i);
+
workers[i].setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
@Override public void uncaughtException(Thread t, Throwable e)
{
fail.compareAndSet(null, e);
@@ -245,19 +244,22 @@ public class IgnitePdsThreadInterruptionTest extends
GridCommonAbstractTest {
assertNull(t);
- IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
+ IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
int verifiedKeys = 0;
+ Map<Integer, byte[]> res = cache.getAll(keysToCheck);
+
+ Assert.assertEquals(res.size(), keysToCheck.size());
+
// Post check.
- for (int i = 0; i < maxKey; i++) {
- byte[] val = (byte[]) cache.get(i);
+ for (Integer key: keysToCheck) {
+ byte[] val = res.get(key);
- if (val != null) {
- assertEquals("Illegal length", valLen, val.length);
+ assertNotNull(val);
+ assertEquals("Illegal length", VAL_LEN, val.length);
- verifiedKeys++;
- }
+ verifiedKeys++;
}
log.info("Verified keys: " + verifiedKeys);
diff --git
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index e6a3949..3817994 100644
---
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -103,6 +103,7 @@ import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionRollbackException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.NotNull;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -691,6 +692,7 @@ public class GridCommandHandlerTest extends
GridCommonAbstractTest {
* Simulate uncommitted backup transactions and test rolling back using
utility.
*/
@Test
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-10899")
public void testKillHangingRemoteTransactions() throws Exception {
final int cnt = 3;