This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new fa82bc8  GEODE-9660: ConcurrentLoopingThreads should not run action in 
case of thread exceptions (#6993)
fa82bc8 is described below

commit fa82bc871821c78818e518b7ce84a1286e6abe28
Author: Jens Deppe <[email protected]>
AuthorDate: Thu Oct 14 05:36:23 2021 -0700

    GEODE-9660: ConcurrentLoopingThreads should not run action in case of 
thread exceptions (#6993)
---
 .../geode/redis/ConcurrentLoopingThreads.java      |   9 +-
 .../geode/redis/ConcurrentLoopingThreadsTest.java  | 148 +++++++++++++++++++++
 .../key/AbstractRenameIntegrationTest.java         |  15 +--
 3 files changed, 152 insertions(+), 20 deletions(-)

diff --git 
a/geode-for-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
 
b/geode-for-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
index b1ec0af..21a0cc1 100644
--- 
a/geode-for-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
+++ 
b/geode-for-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
@@ -165,12 +165,9 @@ public class ConcurrentLoopingThreads {
         waitForBarrier();
       }
       for (int i = 0; i < iterationCount && running.get(); i++) {
-        try {
-          runnable.accept(i);
-        } finally {
-          if (lockstep) {
-            waitForBarrier();
-          }
+        runnable.accept(i);
+        if (lockstep) {
+          waitForBarrier();
         }
       }
     }
diff --git 
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/ConcurrentLoopingThreadsTest.java
 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/ConcurrentLoopingThreadsTest.java
new file mode 100644
index 0000000..f80a92c
--- /dev/null
+++ 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/ConcurrentLoopingThreadsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+public class ConcurrentLoopingThreadsTest {
+
+  @Test
+  public void withIterations_actionDoesNotRunWhenThreadsThrowExceptions() {
+    AtomicBoolean actionRan = new AtomicBoolean(false);
+    AtomicInteger count = new AtomicInteger(0);
+
+    assertThatThrownBy(() -> new ConcurrentLoopingThreads(10,
+        i -> count.incrementAndGet(),
+        i -> {
+          throw new RuntimeException("BANG");
+        })
+            .runWithAction(() -> actionRan.set(true)))
+                .isInstanceOf(RuntimeException.class);
+
+    assertThat(count.get()).as("Expecting good thread to run 
once").isEqualTo(1);
+    assertThat(actionRan.get()).as("action should not run").isFalse();
+  }
+
+  @Test
+  public void withSignal_actionDoesNotRunWhenThreadsThrowExceptions() {
+    AtomicBoolean actionRan = new AtomicBoolean(false);
+    AtomicBoolean running = new AtomicBoolean(true);
+    AtomicInteger count = new AtomicInteger(0);
+
+    assertThatThrownBy(() -> new ConcurrentLoopingThreads(running,
+        i -> count.incrementAndGet(),
+        i -> {
+          throw new RuntimeException("BANG");
+        })
+            .runWithAction(() -> actionRan.set(true)))
+                .isInstanceOf(RuntimeException.class);
+
+    assertThat(count.get()).as("Expecting good thread to run 
once").isEqualTo(1);
+    assertThat(actionRan.get()).as("action should not run").isFalse();
+  }
+
+  @Test
+  public void withIterations_threadFailure_shouldStopAllOtherThreads() {
+    AtomicInteger count = new AtomicInteger(0);
+
+    assertThatThrownBy(() -> new ConcurrentLoopingThreads(10,
+        i -> count.incrementAndGet(),
+        i -> {
+          throw new RuntimeException("BANG");
+        }).runInLockstep());
+
+    assertThat(count.get()).isEqualTo(1);
+  }
+
+  @Test
+  public void 
withIterations_failingThreadExceptionIsPropagatedBeforeActionException() {
+    AtomicInteger count = new AtomicInteger(0);
+
+    assertThatThrownBy(() -> new ConcurrentLoopingThreads(10,
+        i -> count.incrementAndGet(),
+        i -> {
+          throw new RuntimeException("Thread BANG");
+        }).runWithAction(() -> {
+          throw new RuntimeException("Action BANG");
+        })).hasMessageContaining("Thread BANG");
+
+    assertThat(count.get()).isEqualTo(1);
+  }
+
+  @Test
+  public void withSignal_threadFailure_shouldStopAllOtherThreads() {
+    AtomicInteger count = new AtomicInteger(0);
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    assertThatThrownBy(() -> new ConcurrentLoopingThreads(running,
+        i -> count.incrementAndGet(),
+        i -> {
+          throw new RuntimeException("BANG");
+        }).runInLockstep());
+
+    assertThat(count.get()).isEqualTo(1);
+  }
+
+  @Test
+  public void withIterations_actionFailure_shouldStopImmediately() {
+    AtomicInteger count = new AtomicInteger(0);
+
+    assertThatThrownBy(() -> new ConcurrentLoopingThreads(10,
+        i -> count.incrementAndGet(),
+        i -> count.incrementAndGet())
+            .runWithAction(() -> {
+              throw new RuntimeException("BANG");
+            }))
+                .isInstanceOf(RuntimeException.class);
+
+    assertThat(count.get()).isEqualTo(2);
+  }
+
+  @Test
+  public void withSignal_actionFailure_shouldStopImmediately() {
+    AtomicInteger count = new AtomicInteger(0);
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    assertThatThrownBy(() -> new ConcurrentLoopingThreads(running,
+        i -> count.incrementAndGet(),
+        i -> count.incrementAndGet())
+            .runWithAction(() -> {
+              throw new RuntimeException("BANG");
+            }))
+                .isInstanceOf(RuntimeException.class);
+
+    assertThat(count.get()).isEqualTo(2);
+  }
+
+  @Test
+  public void withSignal_actionWillAlwaysRun() {
+    AtomicInteger count = new AtomicInteger(0);
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    new ConcurrentLoopingThreads(running,
+        i -> running.set(false))
+            .runWithAction(count::incrementAndGet);
+
+    assertThat(count.get()).isEqualTo(1);
+  }
+
+}
diff --git 
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
index 1335e0c..f538555 100644
--- 
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
+++ 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.After;
 import org.junit.Before;
@@ -248,25 +247,13 @@ public abstract class AbstractRenameIntegrationTest 
implements RedisIntegrationT
   public void shouldError_givenKeyDeletedDuringRename() {
     int iterations = 2000;
 
-    final AtomicReference<RuntimeException> renameException = new 
AtomicReference<>(null);
-
     jedis.set("{user1}oldKey", "foo");
 
     try {
       new ConcurrentLoopingThreads(iterations,
-          i -> {
-            try {
-              jedis.rename("{user1}oldKey", "{user1}newKey");
-            } catch (RuntimeException e) {
-              renameException.set(e);
-            }
-          },
+          i -> jedis.rename("{user1}oldKey", "{user1}newKey"),
           i -> jedis.del("{user1}oldKey"))
               .runWithAction(() -> {
-                RuntimeException e = renameException.get();
-                if (e != null) {
-                  throw e;
-                }
                 assertThat(jedis.get("{user1}newKey")).isEqualTo("foo");
                 assertThat(jedis.exists("{user1}oldKey")).isFalse();
                 flushAll();

Reply via email to