This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 2c33e81 Replace parallel collections with future based concurrency in
tests. (#4841)
2c33e81 is described below
commit 2c33e81df3e623a77afa17afdf3590f06a8d3b9d
Author: Markus Thömmes <[email protected]>
AuthorDate: Thu Feb 27 14:02:51 2020 +0100
Replace parallel collections with future based concurrency in tests. (#4841)
Scala 2.13 puts parallel collections into a separate module that's not
compatible with Scala 2.12. To avoid having to work around things and to keep
cross-compilation compatibility this just exchanges the approach for
concurrency in tests to not use parallel collections at all.
---
.../src/test/scala/common/ConcurrencyHelpers.scala | 30 ++++++++++
tests/src/test/scala/limits/ThrottleTests.scala | 19 +++---
.../openwhisk/common/ForcibleSemaphoreTests.scala | 11 +++-
.../openwhisk/common/NestedSemaphoreTests.scala | 25 +++++---
.../openwhisk/common/ResizableSemaphoreTests.scala | 17 ++++--
.../core/controller/test/ActionsApiTests.scala | 2 +-
.../core/limits/MaxActionDurationTests.scala | 70 ++++++++++------------
.../test/ShardingContainerPoolBalancerTests.scala | 6 +-
8 files changed, 115 insertions(+), 65 deletions(-)
diff --git a/tests/src/test/scala/common/ConcurrencyHelpers.scala
b/tests/src/test/scala/common/ConcurrencyHelpers.scala
new file mode 100644
index 0000000..39fb1f5
--- /dev/null
+++ b/tests/src/test/scala/common/ConcurrencyHelpers.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+trait ConcurrencyHelpers {
+ def concurrently[T](times: Int, timeout: FiniteDuration)(op: => T)(implicit
ec: ExecutionContext): Iterable[T] =
+ Await.result(Future.sequence((1 to times).map(_ => Future(op))), timeout)
+
+ def concurrently[B, T](over: Iterable[B], timeout: FiniteDuration)(op: B =>
T)(
+ implicit ec: ExecutionContext): Iterable[T] =
+ Await.result(Future.sequence(over.map(v => Future(op(v)))), timeout)
+}
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala
b/tests/src/test/scala/limits/ThrottleTests.scala
index ee67e3c..7b9e00f 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -21,9 +21,7 @@ import java.time.Instant
import akka.http.scaladsl.model.StatusCodes.TooManyRequests
-import scala.collection.parallel.immutable.ParSeq
-import scala.concurrent.Future
-import scala.concurrent.Promise
+import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
@@ -102,10 +100,15 @@ class ThrottleTests
*
* @param results the sequence of results from invocations or firings
*/
- def waitForActivations(results: ParSeq[RunResult]) = results.foreach {
result =>
- if (result.exitCode == SUCCESS_EXIT) {
- withActivation(wsk.activation, result, totalWait = 5.minutes)(identity)
+ def waitForActivations(results: Seq[RunResult]) = {
+ val done = results.map { result =>
+ if (result.exitCode == SUCCESS_EXIT) {
+ Future(withActivation(wsk.activation, result, totalWait = 5.minutes)(_
=> ()))
+ } else {
+ Future.successful(())
+ }
}
+ Await.result(Future.sequence(done), 5.minutes)
}
/**
@@ -201,7 +204,7 @@ class ThrottleTests
// wait for the activations last, if these fail, the throttle should be
settled
// and this gives the activations time to complete and may avoid
unnecessarily polling
println("waiting for activations to complete")
- waitForActivations(results.par)
+ waitForActivations(results)
}
it should "throttle multiple activations of one trigger" in
withAssetCleaner(wskprops) { (wp, assetHelper) =>
@@ -286,7 +289,7 @@ class ThrottleTests
// wait for the activations last, giving the activations time to complete
and
// may avoid unnecessarily polling; if these fail, the throttle may not be
settled
println("waiting for activations to complete")
- waitForActivations(combinedResults.par)
+ waitForActivations(combinedResults)
}
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala
b/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala
index cb4939d..756521f 100644
---
a/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala
@@ -17,12 +17,19 @@
package org.apache.openwhisk.common
+import common.ConcurrencyHelpers
+import org.apache.openwhisk.utils.ExecutionContextFactory
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatest.junit.JUnitRunner
+import scala.concurrent.duration.DurationInt
+
@RunWith(classOf[JUnitRunner])
-class ForcibleSemaphoreTests extends FlatSpec with Matchers {
+class ForcibleSemaphoreTests extends FlatSpec with Matchers with
ConcurrencyHelpers {
+ // use an infinite thread pool to allow for maximum concurrency
+ implicit val executionContext =
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+
behavior of "ForcableSemaphore"
it should "not allow to acquire, force or release negative amounts of
permits" in {
@@ -79,7 +86,7 @@ class ForcibleSemaphoreTests extends FlatSpec with Matchers {
(0 until 100).foreach { _ =>
val s = new ForcibleSemaphore(32)
// try to acquire more permits than allowed in parallel
- val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
+ val acquires = concurrently(64, 1.minute)(s.tryAcquire())
val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
acquires should contain theSameElementsAs result
diff --git
a/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala
b/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala
index e87c734..b421d03 100644
---
a/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala
@@ -17,13 +17,21 @@
package org.apache.openwhisk.common
+import common.ConcurrencyHelpers
+import org.apache.openwhisk.utils.ExecutionContextFactory
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
+import scala.concurrent.duration.DurationInt
+
@RunWith(classOf[JUnitRunner])
-class NestedSemaphoreTests extends FlatSpec with Matchers {
+class NestedSemaphoreTests extends FlatSpec with Matchers with
ConcurrencyHelpers {
+ // use an infinite thread pool to allow for maximum concurrency
+ implicit val executionContext =
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+ val acquireTimeout = 1.minute
+
behavior of "NestedSemaphore"
it should "allow acquire of concurrency permits before acquire of memory
permits" in {
@@ -34,16 +42,17 @@ class NestedSemaphoreTests extends FlatSpec with Matchers {
val actionConcurrency = 5
val actionMemory = 3
//use all concurrency on a single slot
- (1 to 5).par.map { i =>
- s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory)
shouldBe true
- }
+ concurrently(5, acquireTimeout) {
+ s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory)
+ } should contain only true
s.availablePermits shouldBe 20 - 3 //we used a single container (memory ==
3)
s.concurrentState(actionId).availablePermits shouldBe 0
//use up all the remaining memory (17) and concurrency slots (17 / 3 * 5 =
25)
- (1 to 25).par.map { i =>
- s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory)
shouldBe true
- }
+ concurrently(25, acquireTimeout) {
+ s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory)
+ } should contain only true
+
s.availablePermits shouldBe 2 //we used 18 (20/3 = 6, 6*3=18)
s.concurrentState(actionId).availablePermits shouldBe 0
s.tryAcquireConcurrent("action1", actionConcurrency, actionMemory)
shouldBe false
@@ -55,7 +64,7 @@ class NestedSemaphoreTests extends FlatSpec with Matchers {
(0 until 100).foreach { _ =>
val s = new NestedSemaphore(32)
// try to acquire more permits than allowed in parallel
- val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
+ val acquires = concurrently(64, acquireTimeout)(s.tryAcquire())
val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
acquires should contain theSameElementsAs result
diff --git
a/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala
b/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala
index 19048c4..4115c0e 100644
---
a/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala
@@ -17,13 +17,21 @@
package org.apache.openwhisk.common
+import common.ConcurrencyHelpers
+import org.apache.openwhisk.utils.ExecutionContextFactory
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
+import scala.concurrent.duration.DurationInt
+
@RunWith(classOf[JUnitRunner])
-class ResizableSemaphoreTests extends FlatSpec with Matchers {
+class ResizableSemaphoreTests extends FlatSpec with Matchers with
ConcurrencyHelpers {
+ // use an infinite thread pool to allow for maximum concurrency
+ implicit val executionContext =
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+ val acquireTimeout = 1.minute
+
behavior of "ResizableSemaphore"
it should "not allow to acquire, force or release negative amounts of
permits" in {
@@ -163,7 +171,7 @@ class ResizableSemaphoreTests extends FlatSpec with
Matchers {
(0 until 100).foreach { _ =>
val s = new ResizableSemaphore(32, 35)
// try to acquire more permits than allowed in parallel
- val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
+ val acquires = concurrently(64, acquireTimeout)(s.tryAcquire())
val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
acquires should contain theSameElementsAs result
@@ -173,11 +181,10 @@ class ResizableSemaphoreTests extends FlatSpec with
Matchers {
it should "release permits even under concurrent load" in {
val s = new ResizableSemaphore(32, 35)
// try to acquire more permits than allowed in parallel
- val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
+ concurrently(64, acquireTimeout)(s.tryAcquire())
+ concurrently(32, acquireTimeout)(s.release(1, true))
- (0 until 32).par.map(_ => s.release(1, true))
s.counter shouldBe 0
-
}
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
index 9a6075e..9a1a96e 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
@@ -1161,7 +1161,7 @@ class ActionsApiTests extends ControllerTestCommon with
WhiskActionsApi {
action.publish,
action.annotations ++ systemAnnotations(kind))
- (0 until 5).par.map { i =>
+ (0 until 5).map { i =>
Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~>
check {
status should be(OK)
val response = responseAs[WhiskAction]
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala
index 4995b0c..167bcc3 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala
@@ -18,18 +18,12 @@
package org.apache.openwhisk.core.limits
import java.io.File
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.DurationInt
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
-
-import common.TestHelpers
-import common.TestUtils
+import common.{ConcurrencyHelpers, TestHelpers, TestUtils, WskActorSystem,
WskProps, WskTestHelpers}
import common.rest.WskRestOperations
-import common.WskProps
-import common.WskTestHelpers
-import common.WskActorSystem
-
import org.apache.openwhisk.core.entity._
import spray.json.DefaultJsonProtocol._
import spray.json._
@@ -41,7 +35,7 @@ import org.scalatest.tagobjects.Slow
* Tests for action duration limits. These tests require a deployed backend.
*/
@RunWith(classOf[JUnitRunner])
-class MaxActionDurationTests extends TestHelpers with WskTestHelpers with
WskActorSystem {
+class MaxActionDurationTests extends TestHelpers with WskTestHelpers with
WskActorSystem with ConcurrencyHelpers {
implicit val wskprops = WskProps()
val wsk = new WskRestOperations
@@ -65,41 +59,41 @@ class MaxActionDurationTests extends TestHelpers with
WskTestHelpers with WskAct
"node-, python, and java-action" should s"run up to the max allowed duration
(${TimeLimit.MAX_DURATION})" taggedAs (Slow) in withAssetCleaner(
wskprops) { (wp, assetHelper) =>
// When you add more runtimes, keep in mind, how many actions can be
processed in parallel by the Invokers!
- Map("node" -> "helloDeadline.js", "python" -> "sleep.py", "java" ->
"sleep.jar")
+ val runtimes = Map("node" -> "helloDeadline.js", "python" -> "sleep.py",
"java" -> "sleep.jar")
.filter {
case (_, name) =>
new File(TestUtils.getTestActionFilename(name)).exists()
}
- .par
- .map {
- case (k, name) =>
- println(s"Testing action kind '${k}' with action '${name}'")
- assetHelper.withCleaner(wsk.action, name) { (action, _) =>
- val main = if (k == "java") Some("Sleep") else None
- action.create(
- name,
- Some(TestUtils.getTestActionFilename(name)),
- timeout = Some(TimeLimit.MAX_DURATION),
- main = main)
- }
- val run = wsk.action.invoke(
+ concurrently(runtimes.toSeq, TimeLimit.MAX_DURATION + 2.minutes) {
+ case (k, name) =>
+ println(s"Testing action kind '${k}' with action '${name}'")
+ assetHelper.withCleaner(wsk.action, name) { (action, _) =>
+ val main = if (k == "java") Some("Sleep") else None
+ action.create(
name,
- Map("forceHang" -> true.toJson, "sleepTimeInMs" ->
(TimeLimit.MAX_DURATION + 30.seconds).toMillis.toJson))
- withActivation(
- wsk.activation,
- run,
- initialWait = 1.minute,
- pollPeriod = 1.minute,
- totalWait = TimeLimit.MAX_DURATION + 2.minutes) { activation =>
- withClue("Activation result not as expected:") {
- activation.response.status shouldBe
ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
- activation.response.result shouldBe Some(
- JsObject("error" ->
Messages.timedoutActivation(TimeLimit.MAX_DURATION, init = false).toJson))
- activation.duration.toInt should be >=
TimeLimit.MAX_DURATION.toMillis.toInt
- }
+ Some(TestUtils.getTestActionFilename(name)),
+ timeout = Some(TimeLimit.MAX_DURATION),
+ main = main)
+ }
+
+ val run = wsk.action.invoke(
+ name,
+ Map("forceHang" -> true.toJson, "sleepTimeInMs" ->
(TimeLimit.MAX_DURATION + 30.seconds).toMillis.toJson))
+
+ withActivation(
+ wsk.activation,
+ run,
+ initialWait = 1.minute,
+ pollPeriod = 1.minute,
+ totalWait = TimeLimit.MAX_DURATION + 2.minutes) { activation =>
+ withClue("Activation result not as expected:") {
+ activation.response.status shouldBe
ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
+ activation.response.result shouldBe Some(
+ JsObject("error" ->
Messages.timedoutActivation(TimeLimit.MAX_DURATION, init = false).toJson))
+ activation.duration.toInt should be >=
TimeLimit.MAX_DURATION.toMillis.toInt
}
- () // explicitly map to Unit
- }
+ }
+ }
}
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 031e51a..dab21aa 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -497,7 +497,7 @@ class ShardingContainerPoolBalancerTests
val stepSize = stepSizes(hash % stepSizes.size)
val uuid = UUID()
//initiate activation
- val published = (0 until numActivations).par.map { _ =>
+ val published = (0 until numActivations).map { _ =>
val aid = ActivationId.generate()
val msg = ActivationMessage(
TransactionId.testing,
@@ -545,12 +545,12 @@ class ShardingContainerPoolBalancerTests
}
//complete all
- val acks = ids.par.map { aid =>
+ val acks = ids.map { aid =>
val invoker = balancer.activationSlots(aid).invokerName
completeActivation(invoker, balancer, aid)
}
- Await.ready(Future.sequence(acks.toList), 10.seconds)
+ Await.ready(Future.sequence(acks), 10.seconds)
//verify invokers go back to unused state
invokers.foreach { i =>