markusthoemmes closed pull request #3789: Refactoring the load balancer with an 
overflow queue to reduce activation waiting time
URL: https://github.com/apache/incubator-openwhisk/pull/3789
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 6aa1bbd890..7a43100732 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -102,6 +102,12 @@ whisk {
                 retention-ms      =  172800000
                 max-message-bytes = ${whisk.activation.payload.max}
             }
+            overflow {
+                segment-bytes     =  536870912
+                retention-bytes   = 1073741824
+                retention-ms      =  172800000
+                max-message-bytes = ${whisk.activation.payload.max}
+            }
             events {
                 segment-bytes   =  536870912
                 retention-bytes = 1073741824
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala 
b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index b17de8f31a..d0b36f275f 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -19,6 +19,7 @@ package whisk.core.connector
 
 import scala.util.Try
 import spray.json._
+import spray.json.DefaultJsonProtocol
 import whisk.common.TransactionId
 import whisk.core.entity._
 
@@ -52,6 +53,15 @@ case class ActivationMessage(override val transid: 
TransactionId,
                              cause: Option[ActivationId] = None)
     extends Message {
 
+  def meta =
+    JsObject("meta" -> {
+      cause map { c =>
+        JsObject(c.toJsObject.fields ++ activationId.toJsObject.fields)
+      } getOrElse {
+        activationId.toJsObject
+      }
+    })
+
   override def serialize = ActivationMessage.serdes.write(this).compactPrint
 
   override def toString = {
@@ -117,6 +127,23 @@ object PingMessage extends DefaultJsonProtocol {
   implicit val serdes = jsonFormat(PingMessage.apply _, "name")
 }
 
+case class OverflowMessage(override val transid: TransactionId,
+                           msg: ActivationMessage,
+                           actionTimeoutSeconds: Int,
+                           pull: Boolean,
+                           hash: Int)
+    extends Message {
+
+  override def serialize: String = {
+    OverflowMessage.serdes.write(this).compactPrint
+  }
+}
+
+object OverflowMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[OverflowMessage] = 
Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat5(OverflowMessage.apply)
+}
+
 trait EventMessageBody extends Message {
   def typeName: String
 }
diff --git 
a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala 
b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 4c21545f41..ce36e7756d 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -17,6 +17,7 @@
 
 package whisk.core.connector
 
+import akka.actor.ActorRef
 import scala.annotation.tailrec
 import scala.collection.immutable
 import scala.concurrent.Future
@@ -70,6 +71,9 @@ object MessageFeed {
   /** Steady state message, indicates capacity in downstream process to 
receive more messages. */
   object Processed
 
+  /** message to indicate max offset is reached */
+  object MaxOffset
+
   /** Indicates the fill operation has completed. */
   private case class FillCompleted(messages: Seq[(String, Int, Long, 
Array[Byte])])
 }
@@ -97,7 +101,8 @@ class MessageFeed(description: String,
                   longPollDuration: FiniteDuration,
                   handler: Array[Byte] => Future[Unit],
                   autoStart: Boolean = true,
-                  logHandoff: Boolean = true)
+                  logHandoff: Boolean = true,
+                  offsetMonitor: Option[ActorRef] = None)
     extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] {
   import MessageFeed._
 
@@ -185,6 +190,10 @@ class MessageFeed(description: String,
           // of the commit should be masked.
           val records = consumer.peek(longPollDuration)
           consumer.commit()
+          if (records.size < maxPipelineDepth) {
+            //reached the max offset
+            offsetMonitor.foreach(_ ! MaxOffset)
+          }
           FillCompleted(records.toSeq)
         }
       }.andThen {
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/Controller.scala 
b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index a8b6a4e960..17c042e93d 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -222,6 +222,7 @@ object Controller {
       "completed" + instance -> "completed",
       "health" -> "health",
       "cacheInvalidation" -> "cache-invalidation",
+      "overflow" -> "overflow",
       "events" -> "events").foreach {
       case (topic, topicConfigurationKey) =>
         if (msgProvider.ensureTopic(config, topic, 
topicConfigurationKey).isFailure) {
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index bad384288b..93fc6443fa 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -18,9 +18,8 @@
 package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.ThreadLocalRandom
 import java.util.concurrent.atomic.LongAdder
-
+import java.util.concurrent.atomic.AtomicBoolean
 import akka.actor.{Actor, ActorSystem, Cancellable, Props}
 import akka.cluster.ClusterEvent._
 import akka.cluster.{Cluster, Member, MemberStatus}
@@ -74,6 +73,9 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
   private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
   private val totalActivations = new LongAdder()
 
+  var localOverflowActivationCount: Int = 0
+  val overflowState = new AtomicBoolean(false)
+
   /** State needed for scheduling. */
   private val schedulingState = ShardingContainerPoolBalancerState()()
 
@@ -133,38 +135,91 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
   /** 1. Publish a message to the loadbalancer */
   override def publish(action: ExecutableWhiskActionMetaData, msg: 
ActivationMessage)(
     implicit transid: TransactionId): Future[Future[Either[ActivationId, 
WhiskActivation]]] = {
+    val hash = 
ShardingContainerPoolBalancer.generateHash(msg.user.namespace.name, 
action.fullyQualifiedName(false))
+    publish(msg.transid, action.limits.timeout.duration.toSeconds.toInt, 
action.exec.pull, hash, msg, false)
+  }
 
+  def publish(
+    transid: TransactionId,
+    actionTimeoutSeconds: Int,
+    pull: Boolean,
+    hash: Int,
+    msg: ActivationMessage,
+    isRescheduled: Boolean): Future[Future[Either[ActivationId, 
WhiskActivation]]] = {
+    logging.info(this, s"Publish activation ${msg.activationId} with & 
isRescheduled= $isRescheduled")
     val (invokersToUse, stepSizes) =
-      if (!action.exec.pull) (schedulingState.managedInvokers, 
schedulingState.managedStepSizes)
+      if (!pull) (schedulingState.managedInvokers, 
schedulingState.managedStepSizes)
       else (schedulingState.blackboxInvokers, 
schedulingState.blackboxStepSizes)
+
     val chosen = if (invokersToUse.nonEmpty) {
-      val hash = 
ShardingContainerPoolBalancer.generateHash(msg.user.namespace.name, 
action.fullyQualifiedName(false))
-      val homeInvoker = hash % invokersToUse.size
-      val stepSize = stepSizes(hash % stepSizes.size)
-      ShardingContainerPoolBalancer.schedule(invokersToUse, 
schedulingState.invokerSlots, homeInvoker, stepSize)
+      // In case of the system isn't in overflow state or it's in the overflow 
state but the message is rescheduled
+      // we want to call the schedule algorithm to choose an invoker
+      if (!overflowState.get() || (overflowState.get() && isRescheduled)) {
+        val homeInvoker = hash % invokersToUse.size
+        val stepSize = stepSizes(hash % stepSizes.size)
+        ShardingContainerPoolBalancer.schedule(invokersToUse, 
schedulingState.invokerSlots, homeInvoker, stepSize)
+      } else {
+        // In case of the system is in overflow state and the 
ActivationMessage isn't rescheduled
+        // return -1 indicates as an index for the invoker
+        Some(new InstanceId(-1))
+      }
     } else {
       None
     }
 
     chosen
       .map { invoker =>
-        val entry = setupActivation(msg, action, invoker)
-        sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
-          entry.promise.future
+        // this means that the rescheduled message didn't succeed to find an 
invoker that has capacity
+        if (invoker.toInt == -1 && isRescheduled) {
+          // don't remove the message from the overflow topic
+          // TODO: requeue the ActivationMessage into the overflow topic agin?
+          logging.error(this, "The re-scheduled ActivationMessage hasn't find 
any invoker that has capacity")
+          Future.failed(
+            LoadBalancerException("The rescheduled ActivationMessage hasn't 
find any invoker that has capacity"))
+        }
+
+        // this means that there is no invoker that has capacity for this non 
scheduled ActivationMessage (enter the overflow state)
+        else if (invoker.toInt == -1 && !isRescheduled) {
+          logging.info(this, s"The sceduled ActivationMessage hasn't find any 
invoker that has capacity")
+          if (overflowState.compareAndSet(false, true)) {
+            logging.info(this, "Entering overflow state")
+          }
+          val entry = setupActivation(msg, actionTimeoutSeconds.seconds, 
invoker, false)
+          sendActivationToOverflow(messageProducer, OverflowMessage(transid, 
msg, actionTimeoutSeconds, pull, hash))
+            .map { _ =>
+              entry.promise.future
+            }
+        }
+
+        // there is an invoker that has capacity to handle the action
+        else {
+          logging.info(this, s"There is an invoker to be assigned with 
activation id ${msg.activationId}")
+          val entry = setupActivation(msg, actionTimeoutSeconds.seconds, 
invoker, isRescheduled)
+          if (isRescheduled) {
+            logging.info(this, s"Commit rescheduled message with activation id 
'${msg.activationId}'")
+            overflowConsumer.commit()
+          }
+          sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
+            entry.promise.future
+          }
         }
       }
-      .getOrElse(Future.failed(LoadBalancerException("No invokers available")))
+      .getOrElse(Future.failed(LoadBalancerException("No invokers available in 
the system")))
   }
 
   /** 2. Update local state with the to be executed activation */
-  private def setupActivation(msg: ActivationMessage,
-                              action: ExecutableWhiskActionMetaData,
-                              instance: InstanceId): ActivationEntry = {
-
-    totalActivations.increment()
-    activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new 
LongAdder()).increment()
+  private def setupActivation(
+    msg: ActivationMessage,
+    actionTimeout: FiniteDuration,
+    instance: InstanceId,
+    isRescheduled: Boolean = false): ActivationEntry = {
+    // Increment the activations count in case of a new activation (not 
rescheduled)
+    if (!isRescheduled) {
+      totalActivations.increment()
+      activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new 
LongAdder()).increment()
+    }
 
-    val timeout = action.limits.timeout.duration.max(TimeLimit.STD_DURATION) + 
1.minute
+    val timeout = actionTimeout.max(TimeLimit.STD_DURATION) + 1.minute
     // Install a timeout handler for the catastrophic case where an active ack 
is not received at all
     // (because say an invoker is down completely, or the connection to the 
message bus is disrupted) or when
     // the active ack is significantly delayed (possibly dues to long queues 
but the subject should not be penalized);
@@ -189,9 +244,10 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
   private val messageProducer = messagingProvider.getProducer(config)
 
   /** 3. Send the activation to the invoker */
-  private def sendActivationToInvoker(producer: MessageProducer,
-                                      msg: ActivationMessage,
-                                      invoker: InstanceId): 
Future[RecordMetadata] = {
+  private def sendActivationToInvoker(
+    producer: MessageProducer,
+    msg: ActivationMessage,
+    invoker: InstanceId): Future[RecordMetadata] = {
     implicit val transid: TransactionId = msg.transid
 
     val topic = s"invoker${invoker.toInt}"
@@ -214,6 +270,25 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
     }
   }
 
+  /** Send the Overflow Message to the Overflow topic */
+  private def sendActivationToOverflow(producer: MessageProducer, msg: 
OverflowMessage): Future[RecordMetadata] = {
+    implicit val transid = msg.transid
+
+    val topic = s"overflow"
+    
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
+    val start = transid.started(
+      this,
+      LoggingMarkers.CONTROLLER_KAFKA,
+      s"posting overflow topic '$topic' with activation id 
'${msg.msg.activationId}'")
+
+    producer.send(topic, msg).andThen {
+      case Success(status) =>
+        localOverflowActivationCount += 1
+        transid.finished(this, start, s"posted to 
${status.topic()}[${status.partition()}][${status.offset()}]")
+      case Failure(e) => transid.failed(this, start, s"error on posting to 
topic $topic")
+    }
+  }
+
   /**
    * Subscribes to active acks (completion messages from the invokers), and
    * registers a handler for received active acks from invokers.
@@ -249,10 +324,11 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
   }
 
   /** 5. Process the active-ack and update the state accordingly */
-  private def processCompletion(response: Either[ActivationId, 
WhiskActivation],
-                                tid: TransactionId,
-                                forced: Boolean,
-                                invoker: InstanceId): Unit = {
+  private def processCompletion(
+    response: Either[ActivationId, WhiskActivation],
+    tid: TransactionId,
+    forced: Boolean,
+    invoker: InstanceId): Unit = {
     val aid = response.fold(l => l, r => r.activationId)
 
     // treat left as success (as it is the result of a message exceeding the 
bus limit)
@@ -263,8 +339,10 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
         totalActivations.decrement()
         activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
         schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
-
         if (!forced) {
+          // if the balancer is in overflow state then consume an overflow 
message using consumer
+          consumeOverflowMessage()
+
           entry.timeoutHandler.cancel()
           entry.promise.trySuccess(response)
         } else {
@@ -276,6 +354,9 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
         // the load balancer's activation map. Inform the invoker pool 
supervisor of the user action completion.
         invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
       case None if !forced =>
+        // if the balancer is in overflow state then consume an overflow 
message using consumer
+        consumeOverflowMessage()
+
         // the entry has already been removed but we receive an active ack for 
this activation Id.
         // This happens for health actions, because they don't have an entry 
in Loadbalancerdata or
         // for activations that already timed out.
@@ -288,6 +369,46 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
     }
   }
 
+  private def consumeOverflowMessage() {
+    if (overflowState.get()) {
+      logging.info(this, "Consume message from overflow topic after completion 
of an activation")
+
+      localOverflowActivationCount -= 1
+      // when the overflow activation count = 0, then release the overflow 
state
+      if (overflowState.get() && localOverflowActivationCount == 0 && 
overflowState.compareAndSet(true, false)) {
+        logging.info(this, "Removing overflow state after processing 
outstanding overflow messages")
+      }
+      overflowConsumer.peek(1.seconds).map { case (_, _, _, msg) => 
processOverflowMsg(msg) }
+    }
+  }
+
+  /**
+   * Subscribes to overflow messages (In the case all invokers are overloaded 
but a
+   * successful completion message has received indicating that their is 
capacity on
+   * one of the invokers)
+   */
+  private val overflowTopic = "overflow"
+  private val maxOverflowPerPoll = 1
+  private val overflowPollDuration = 1.second
+  private val overflowConsumer =
+    messagingProvider.getConsumer(config, overflowTopic, overflowTopic, 
maxPeek = maxOverflowPerPoll)
+
+  /* Process the overflow messgae and re-publih again */
+  private def processOverflowMsg(bytes: Array[Byte]): Future[Unit] = Future {
+    val raw = new String(bytes, StandardCharsets.UTF_8)
+    OverflowMessage.parse(raw) match {
+      case Success(m: OverflowMessage) =>
+        processOverflow(m)
+      case Failure(t) =>
+        logging.error(this, s"Failed processing overflow message: $raw with 
$t")
+    }
+  }
+
+  private def processOverflow(msg: OverflowMessage): Unit = {
+    logging.info(this, s"Process overflow message for activation 
'${msg.msg.activationId}' and re-publish again")
+    publish(msg.transid, msg.actionTimeoutSeconds, msg.pull, msg.hash, 
msg.msg, true)
+  }
+
   private val invokerPool = {
     InvokerPool.prepare(controllerInstance, WhiskEntityStore.datastore())
 
@@ -337,11 +458,12 @@ object ShardingContainerPoolBalancer extends 
LoadBalancerProvider {
    * @return an invoker to schedule to or None of no invoker is available
    */
   @tailrec
-  def schedule(invokers: IndexedSeq[InvokerHealth],
-               dispatched: IndexedSeq[ForcableSemaphore],
-               index: Int,
-               step: Int,
-               stepsDone: Int = 0)(implicit logging: Logging): 
Option[InstanceId] = {
+  def schedule(
+    invokers: IndexedSeq[InvokerHealth],
+    dispatched: IndexedSeq[ForcableSemaphore],
+    index: Int,
+    step: Int,
+    stepsDone: Int = 0)(implicit logging: Logging): Option[InstanceId] = {
     val numInvokers = invokers.size
 
     if (numInvokers > 0) {
@@ -352,16 +474,10 @@ object ShardingContainerPoolBalancer extends 
LoadBalancerProvider {
       } else {
         // If we've gone through all invokers
         if (stepsDone == numInvokers + 1) {
-          val healthyInvokers = invokers.filter(_.status == Healthy)
-          if (healthyInvokers.nonEmpty) {
-            // Choose a healthy invoker randomly
-            val random = 
healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id
-            dispatched(random.toInt).forceAcquire()
-            logging.warn(this, s"system is overloaded. Chose 
invoker${random.toInt} by random assignment.")
-            Some(random)
-          } else {
-            None
-          }
+          // return -1 if there are invokers but no one is available
+          // should enter in the overflow state
+          Some(new InstanceId(-1))
+
         } else {
           val newIndex = (index + step) % numInvokers
           schedule(invokers, dispatched, newIndex, step, stepsDone + 1)
@@ -499,8 +615,9 @@ case class 
ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invoker
  * @param timeoutHandler times out completion of this activation, should be 
canceled on good paths
  * @param promise the promise to be completed by the activation
  */
-case class ActivationEntry(id: ActivationId,
-                           namespaceId: UUID,
-                           invokerName: InstanceId,
-                           timeoutHandler: Cancellable,
-                           promise: Promise[Either[ActivationId, 
WhiskActivation]])
+case class ActivationEntry(
+  id: ActivationId,
+  namespaceId: UUID,
+  invokerName: InstanceId,
+  timeoutHandler: Cancellable,
+  promise: Promise[Either[ActivationId, WhiskActivation]])
diff --git a/gradle/docker.gradle b/gradle/docker.gradle
index 6ad6850629..bab49fb8e4 100644
--- a/gradle/docker.gradle
+++ b/gradle/docker.gradle
@@ -42,7 +42,7 @@ ext {
     dockerRegistry = project.hasProperty('dockerRegistry') ? dockerRegistry + 
'/' : ''
     dockerImageTag = project.hasProperty('dockerImageTag') ? dockerImageTag : 
'latest'
     dockerImagePrefix = project.hasProperty('dockerImagePrefix') ? 
dockerImagePrefix : 'whisk'
-    dockerTimeout = project.hasProperty('dockerTimeout') ? 
dockerTimeout.toInteger() : 840
+    dockerTimeout = project.hasProperty('dockerTimeout') ? 
dockerTimeout.toInteger() : 8400
     dockerRetries = project.hasProperty('dockerRetries') ? 
dockerRetries.toInteger() : 3
     dockerBinary = project.hasProperty('dockerBinary') ? [dockerBinary] : 
['docker']
     dockerBuildArg = ['build']
diff --git 
a/tests/src/test/scala/system/basic/WskRestActivationTests.scala~7e81011234d9c68ef9de890ca8191cd69823e47e
 
b/tests/src/test/scala/system/basic/WskRestActivationTests.scala~7e81011234d9c68ef9de890ca8191cd69823e47e
new file mode 100644
index 0000000000..50449383ab
--- /dev/null
+++ 
b/tests/src/test/scala/system/basic/WskRestActivationTests.scala~7e81011234d9c68ef9de890ca8191cd69823e47e
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package system.basic
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+import common.rest.WskRest
+
+@RunWith(classOf[JUnitRunner])
+class WskRestActivationTests extends WskActivationTests {
+  override val wsk: WskRest = new WskRest
+}
diff --git 
a/tests/src/test/scala/system/basic/WskRestActivationTests.scala~apply changes 
after installaion 
b/tests/src/test/scala/system/basic/WskRestActivationTests.scala~apply changes 
after installaion
new file mode 100644
index 0000000000..50449383ab
--- /dev/null
+++ b/tests/src/test/scala/system/basic/WskRestActivationTests.scala~apply 
changes after installaion    
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package system.basic
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+import common.rest.WskRest
+
+@RunWith(classOf[JUnitRunner])
+class WskRestActivationTests extends WskActivationTests {
+  override val wsk: WskRest = new WskRest
+}
diff --git 
a/tests/src/test/scala/system/basic/WskRestBasicSwift41Tests.scala~7e81011234d9c68ef9de890ca8191cd69823e47e
 
b/tests/src/test/scala/system/basic/WskRestBasicSwift41Tests.scala~7e81011234d9c68ef9de890ca8191cd69823e47e
new file mode 100644
index 0000000000..2a44ccb56a
--- /dev/null
+++ 
b/tests/src/test/scala/system/basic/WskRestBasicSwift41Tests.scala~7e81011234d9c68ef9de890ca8191cd69823e47e
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package system.basic
+
+import common.rest.WskRest
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class WskRestBasicSwift41Tests extends WskBasicSwift3Tests {
+  override val wsk: common.rest.WskRest = new WskRest
+  override lazy val actionKind = "swift:4.1"
+
+}
diff --git 
a/tests/src/test/scala/system/basic/WskRestBasicSwift41Tests.scala~apply 
changes after installaion 
b/tests/src/test/scala/system/basic/WskRestBasicSwift41Tests.scala~apply 
changes after installaion
new file mode 100644
index 0000000000..2a44ccb56a
--- /dev/null
+++ b/tests/src/test/scala/system/basic/WskRestBasicSwift41Tests.scala~apply 
changes after installaion  
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package system.basic
+
+import common.rest.WskRest
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class WskRestBasicSwift41Tests extends WskBasicSwift3Tests {
+  override val wsk: common.rest.WskRest = new WskRest
+  override lazy val actionKind = "swift:4.1"
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to