This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 08efb58 Replace remaining usages of parallel collections with
explicit concurrency. (#4843)
08efb58 is described below
commit 08efb58cf5eef3475bdb41e45db448250bcf9c17
Author: Markus Thömmes <[email protected]>
AuthorDate: Thu Feb 27 16:52:11 2020 +0100
Replace remaining usages of parallel collections with explicit concurrency.
(#4843)
---
.../core/database/test/CacheConcurrencyTests.scala | 43 ++++++++++++----------
1 file changed, 23 insertions(+), 20 deletions(-)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala
index 90c8bb7..0026df8 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala
@@ -17,9 +17,9 @@
package org.apache.openwhisk.core.database.test
-import scala.collection.parallel._
import scala.concurrent.duration.DurationInt
-import java.util.concurrent.ForkJoinPool
+import java.util.concurrent.Executors
+
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterEach
import org.scalatest.FlatSpec
@@ -32,9 +32,17 @@ import spray.json.JsString
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.utils.retry
-@RunWith(classOf[JUnitRunner])
-class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with
WskActorSystem with BeforeAndAfterEach {
+import scala.concurrent.ExecutionContext
+@RunWith(classOf[JUnitRunner])
+class CacheConcurrencyTests
+ extends FlatSpec
+ with WskTestHelpers
+ with WskActorSystem
+ with BeforeAndAfterEach
+ with ConcurrencyHelpers {
+
+ val timeout = 5.minutes
println(s"Running tests on # proc:
${Runtime.getRuntime.availableProcessors()}")
implicit private val transId = TransactionId.testing
@@ -43,15 +51,14 @@ class CacheConcurrencyTests extends FlatSpec with
WskTestHelpers with WskActorSy
val nExternalIters = 1
val nInternalIters = 5
- val nThreads = nInternalIters * 30
+ val nThreads = nInternalIters * 30 // The maximum number of tasks running in
parallel at any given time
+ val parallelismExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(nThreads))
- val parallel = (1 to nInternalIters).par
- parallel.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads))
-
- def run[W](phase: String)(block: String => W) = parallel.map { i =>
- val name = s"testy${i}"
- withClue(s"$phase: failed for $name") { (name, block(name)) }
- }
+ def run[W](phase: String)(block: String => W) =
+ concurrently((1 to nInternalIters), timeout) { i =>
+ val name = s"testy${i}"
+ withClue(s"$phase: failed for $name") { (name, block(name)) }
+ }(parallelismExecutionContext)
override def beforeEach() = {
run("pre-test sanitize") { name =>
@@ -79,9 +86,7 @@ class CacheConcurrencyTests extends FlatSpec with
WskTestHelpers with WskActorSy
run("delete+get") { name =>
// run 30 operations in parallel: 15 get, 1 delete, 14 more get
- val para = (1 to 30).par
- para.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads))
- para.map { i =>
+ concurrently((1 to 30), timeout) { i =>
if (i != 16) {
val rr = wsk.action.get(name, expectedExitCode = DONTCARE_EXIT)
withClue(s"expecting get to either succeed or fail with not found:
$rr") {
@@ -91,7 +96,7 @@ class CacheConcurrencyTests extends FlatSpec with
WskTestHelpers with WskActorSy
} else {
wsk.action.delete(name)
}
- }
+ }(parallelismExecutionContext)
}
// Give some time to replicate the state between the controllers
@@ -117,9 +122,7 @@ class CacheConcurrencyTests extends FlatSpec with
WskTestHelpers with WskActorSy
run("update+get") { name =>
// run 30 operations in parallel: 15 get, 1 update, 14 more get
- val para = (1 to 30).par
- para.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads))
- para.map { i =>
+ concurrently((1 to 30), timeout) { i =>
if (i != 16) {
val rr = wsk.action.get(name, expectedExitCode = DONTCARE_EXIT)
withClue(s"expecting get to either succeed or fail with not found:
$rr") {
@@ -129,7 +132,7 @@ class CacheConcurrencyTests extends FlatSpec with
WskTestHelpers with WskActorSy
} else {
wsk.action.create(name, None, parameters = Map("color" ->
JsString("blue")), update = true)
}
- }
+ }(parallelismExecutionContext)
}
// All controllers should have the correct action