This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new fa82bc8 GEODE-9660: ConcurrentLoopingThreads should not run action in
case of thread exceptions (#6993)
fa82bc8 is described below
commit fa82bc871821c78818e518b7ce84a1286e6abe28
Author: Jens Deppe <[email protected]>
AuthorDate: Thu Oct 14 05:36:23 2021 -0700
GEODE-9660: ConcurrentLoopingThreads should not run action in case of
thread exceptions (#6993)
---
.../geode/redis/ConcurrentLoopingThreads.java | 9 +-
.../geode/redis/ConcurrentLoopingThreadsTest.java | 148 +++++++++++++++++++++
.../key/AbstractRenameIntegrationTest.java | 15 +--
3 files changed, 152 insertions(+), 20 deletions(-)
diff --git
a/geode-for-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
b/geode-for-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
index b1ec0af..21a0cc1 100644
---
a/geode-for-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
+++
b/geode-for-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
@@ -165,12 +165,9 @@ public class ConcurrentLoopingThreads {
waitForBarrier();
}
for (int i = 0; i < iterationCount && running.get(); i++) {
- try {
- runnable.accept(i);
- } finally {
- if (lockstep) {
- waitForBarrier();
- }
+ runnable.accept(i);
+ if (lockstep) {
+ waitForBarrier();
}
}
}
diff --git
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/ConcurrentLoopingThreadsTest.java
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/ConcurrentLoopingThreadsTest.java
new file mode 100644
index 0000000..f80a92c
--- /dev/null
+++
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/ConcurrentLoopingThreadsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+public class ConcurrentLoopingThreadsTest {
+
+ @Test
+ public void withIterations_actionDoesNotRunWhenThreadsThrowExceptions() {
+ AtomicBoolean actionRan = new AtomicBoolean(false);
+ AtomicInteger count = new AtomicInteger(0);
+
+ assertThatThrownBy(() -> new ConcurrentLoopingThreads(10,
+ i -> count.incrementAndGet(),
+ i -> {
+ throw new RuntimeException("BANG");
+ })
+ .runWithAction(() -> actionRan.set(true)))
+ .isInstanceOf(RuntimeException.class);
+
+ assertThat(count.get()).as("Expecting good thread to run
once").isEqualTo(1);
+ assertThat(actionRan.get()).as("action should not run").isFalse();
+ }
+
+ @Test
+ public void withSignal_actionDoesNotRunWhenThreadsThrowExceptions() {
+ AtomicBoolean actionRan = new AtomicBoolean(false);
+ AtomicBoolean running = new AtomicBoolean(true);
+ AtomicInteger count = new AtomicInteger(0);
+
+ assertThatThrownBy(() -> new ConcurrentLoopingThreads(running,
+ i -> count.incrementAndGet(),
+ i -> {
+ throw new RuntimeException("BANG");
+ })
+ .runWithAction(() -> actionRan.set(true)))
+ .isInstanceOf(RuntimeException.class);
+
+ assertThat(count.get()).as("Expecting good thread to run
once").isEqualTo(1);
+ assertThat(actionRan.get()).as("action should not run").isFalse();
+ }
+
+ @Test
+ public void withIterations_threadFailure_shouldStopAllOtherThreads() {
+ AtomicInteger count = new AtomicInteger(0);
+
+ assertThatThrownBy(() -> new ConcurrentLoopingThreads(10,
+ i -> count.incrementAndGet(),
+ i -> {
+ throw new RuntimeException("BANG");
+ }).runInLockstep());
+
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void
withIterations_failingThreadExceptionIsPropagatedBeforeActionException() {
+ AtomicInteger count = new AtomicInteger(0);
+
+ assertThatThrownBy(() -> new ConcurrentLoopingThreads(10,
+ i -> count.incrementAndGet(),
+ i -> {
+ throw new RuntimeException("Thread BANG");
+ }).runWithAction(() -> {
+ throw new RuntimeException("Action BANG");
+ })).hasMessageContaining("Thread BANG");
+
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void withSignal_threadFailure_shouldStopAllOtherThreads() {
+ AtomicInteger count = new AtomicInteger(0);
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ assertThatThrownBy(() -> new ConcurrentLoopingThreads(running,
+ i -> count.incrementAndGet(),
+ i -> {
+ throw new RuntimeException("BANG");
+ }).runInLockstep());
+
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void withIterations_actionFailure_shouldStopImmediately() {
+ AtomicInteger count = new AtomicInteger(0);
+
+ assertThatThrownBy(() -> new ConcurrentLoopingThreads(10,
+ i -> count.incrementAndGet(),
+ i -> count.incrementAndGet())
+ .runWithAction(() -> {
+ throw new RuntimeException("BANG");
+ }))
+ .isInstanceOf(RuntimeException.class);
+
+ assertThat(count.get()).isEqualTo(2);
+ }
+
+ @Test
+ public void withSignal_actionFailure_shouldStopImmediately() {
+ AtomicInteger count = new AtomicInteger(0);
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ assertThatThrownBy(() -> new ConcurrentLoopingThreads(running,
+ i -> count.incrementAndGet(),
+ i -> count.incrementAndGet())
+ .runWithAction(() -> {
+ throw new RuntimeException("BANG");
+ }))
+ .isInstanceOf(RuntimeException.class);
+
+ assertThat(count.get()).isEqualTo(2);
+ }
+
+ @Test
+ public void withSignal_actionWillAlwaysRun() {
+ AtomicInteger count = new AtomicInteger(0);
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ new ConcurrentLoopingThreads(running,
+ i -> running.set(false))
+ .runWithAction(count::incrementAndGet);
+
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+}
diff --git
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
index 1335e0c..f538555 100644
---
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
+++
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
@@ -248,25 +247,13 @@ public abstract class AbstractRenameIntegrationTest
implements RedisIntegrationT
public void shouldError_givenKeyDeletedDuringRename() {
int iterations = 2000;
- final AtomicReference<RuntimeException> renameException = new
AtomicReference<>(null);
-
jedis.set("{user1}oldKey", "foo");
try {
new ConcurrentLoopingThreads(iterations,
- i -> {
- try {
- jedis.rename("{user1}oldKey", "{user1}newKey");
- } catch (RuntimeException e) {
- renameException.set(e);
- }
- },
+ i -> jedis.rename("{user1}oldKey", "{user1}newKey"),
i -> jedis.del("{user1}oldKey"))
.runWithAction(() -> {
- RuntimeException e = renameException.get();
- if (e != null) {
- throw e;
- }
assertThat(jedis.get("{user1}newKey")).isEqualTo("foo");
assertThat(jedis.exists("{user1}oldKey")).isFalse();
flushAll();