This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 82e01d7 Add the ability to define user-specific throttles. (#2533) 82e01d7 is described below commit 82e01d7b8896b35b68be81d5b622b5721d3bc80a Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Fri Jul 28 17:20:26 2017 +0200 Add the ability to define user-specific throttles. (#2533) This adds the ability to override the rate/concurrency related limits of the system on a per-namespace basis to be able to adapt to specific user needs. --- ansible/files/auth_index.json | 2 +- .../whisk/core/database/CouchDbRestStore.scala | 4 +- .../main/scala/whisk/core/entity/Identity.scala | 20 +++-- .../main/scala/whisk/core/entity/WhiskStore.scala | 2 +- .../core/entitlement/ActivationThrottler.scala | 7 +- .../scala/whisk/core/entitlement/Entitlement.scala | 4 +- .../whisk/core/entitlement/RateThrottler.scala | 20 +++-- tests/src/test/scala/limits/ThrottleTests.scala | 88 ++++++++++++++++++++++ .../core/controller/test/RateThrottleTests.scala | 16 +++- .../scala/whisk/core/entity/test/SchemaTests.scala | 4 +- tools/admin/wskadmin | 47 +++++++++++- 11 files changed, 183 insertions(+), 31 deletions(-) diff --git a/ansible/files/auth_index.json b/ansible/files/auth_index.json index b1c4711..15c40a8 100644 --- a/ansible/files/auth_index.json +++ b/ansible/files/auth_index.json @@ -2,7 +2,7 @@ "_id":"_design/subjects", "views": { "identities": { - "map": "function (doc) {\n if(doc.uuid && doc.key && !doc.blocked) {\n var v = {namespace: doc.subject, uuid: doc.uuid, key: doc.key};\n emit([doc.subject], v);\n emit([doc.uuid, doc.key], v);\n }\n if(doc.namespaces && !doc.blocked) {\n doc.namespaces.forEach(function(namespace) {\n var v = {namespace: namespace.name, uuid: namespace.uuid, key: namespace.key};\n emit([namespace.name], v);\n emit([namespace.uuid, namespace.key], v);\n });\n }\n}" + "map": "function (doc) {\n if(doc.uuid && doc.key && !doc.blocked) {\n var v = {namespace: doc.subject, uuid: doc.uuid, key: doc.key};\n emit([doc.subject], v);\n emit([doc.uuid, doc.key], v);\n }\n if(doc.namespaces && !doc.blocked) {\n doc.namespaces.forEach(function(namespace) {\n var v = {_id: namespace.name + '/limits', namespace: namespace.name, uuid: namespace.uuid, key: namespace.key};\n emit([namespace.name], v);\n emit([namespace.uuid, namesp [...] } }, "language":"javascript", diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala index 79bd898..99cecb0 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala @@ -203,9 +203,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer]( case Right(response) => val rows = response.fields("rows").convertTo[List[JsObject]] - val out = if (includeDocs) { - rows.map(_.fields("doc").asJsObject) - } else if (reduce && !rows.isEmpty) { + val out = if (reduce && !rows.isEmpty) { assert(rows.length == 1, s"result of reduced view contains more than one value: '$rows'") rows.head.fields("value").convertTo[List[JsObject]] } else if (reduce) { diff --git a/common/scala/src/main/scala/whisk/core/entity/Identity.scala b/common/scala/src/main/scala/whisk/core/entity/Identity.scala index 9fb0ef2..38bda86 100644 --- a/common/scala/src/main/scala/whisk/core/entity/Identity.scala +++ b/common/scala/src/main/scala/whisk/core/entity/Identity.scala @@ -27,8 +27,15 @@ import whisk.core.database.MultipleReadersSingleWriterCache import whisk.core.database.NoDocumentException import whisk.core.entitlement.Privilege import whisk.core.entitlement.Privilege.Privilege +import scala.util.Try -protected[core] case class Identity(subject: Subject, namespace: EntityName, authkey: AuthKey, rights: Set[Privilege]) { +case class UserLimits(invocationsPerMinute: Option[Int] = None, concurrentInvocations: Option[Int] = None, firesPerMinute: Option[Int] = None) + +object UserLimits extends DefaultJsonProtocol { + implicit val serdes = jsonFormat3(UserLimits.apply) +} + +protected[core] case class Identity(subject: Subject, namespace: EntityName, authkey: AuthKey, rights: Set[Privilege], limits: UserLimits = UserLimits()) { def uuid = authkey.uuid } @@ -38,7 +45,7 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with override val cacheEnabled = true override def cacheKeyForUpdate(i: Identity) = i.authkey - implicit val serdes = jsonFormat4(Identity.apply) + implicit val serdes = jsonFormat5(Identity.apply) /** * Retrieves a key for namespace. @@ -95,20 +102,21 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with endKey = key, skip = 0, limit = limit, - includeDocs = false, + includeDocs = true, descending = true, reduce = false) } private def rowToIdentity(row: JsObject, key: String)( implicit transid: TransactionId, logger: Logging) = { - row.getFields("id", "value") match { - case Seq(JsString(id), JsObject(value)) => + row.getFields("id", "value", "doc") match { + case Seq(JsString(id), JsObject(value), doc) => + val limits = Try(doc.convertTo[UserLimits]).getOrElse(UserLimits()) val subject = Subject(id) val JsString(uuid) = value("uuid") val JsString(secret) = value("key") val JsString(namespace) = value("namespace") - Identity(subject, EntityName(namespace), AuthKey(UUID(uuid), Secret(secret)), Privilege.ALL) + Identity(subject, EntityName(namespace), AuthKey(UUID(uuid), Secret(secret)), Privilege.ALL, limits) case _ => logger.error(this, s"$viewName[$key] has malformed view '${row.compactPrint}'") throw new IllegalStateException("identities view malformed") diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala index 39b354e..aa48419 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala @@ -294,7 +294,7 @@ object WhiskEntityQueries { db.query(view, startKey, endKey, skip, limit, includeDocs, true, reduce) map { rows => convert map { fn => - Right(rows flatMap { fn(_) toOption }) + Right(rows flatMap { row => fn(row.fields("doc").asJsObject) toOption }) } getOrElse { Left(rows flatMap { normalizeRow(_, reduce) toOption }) } diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala index a778607..f1a80d4 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala @@ -36,10 +36,10 @@ import whisk.core.loadBalancer.LoadBalancer * * @param config containing the config information needed (consulServer) */ -class ActivationThrottler(consulServer: String, loadBalancer: LoadBalancer, concurrencyLimit: Int, systemOverloadLimit: Int)( +class ActivationThrottler(consulServer: String, loadBalancer: LoadBalancer, defaultConcurrencyLimit: Int, systemOverloadLimit: Int)( implicit val system: ActorSystem, logging: Logging) { - logging.info(this, s"concurrencyLimit = $concurrencyLimit, systemOverloadLimit = $systemOverloadLimit") + logging.info(this, s"concurrencyLimit = $defaultConcurrencyLimit, systemOverloadLimit = $systemOverloadLimit") implicit private val executionContext = system.dispatcher @@ -58,7 +58,8 @@ class ActivationThrottler(consulServer: String, loadBalancer: LoadBalancer, conc */ def check(user: Identity)(implicit tid: TransactionId): Boolean = { val concurrentActivations = namespaceActivationCounter.getOrElse(user.uuid, 0) - logging.debug(this, s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit") + val concurrencyLimit = user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit) + logging.info(this, s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit") concurrentActivations < concurrencyLimit } diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala index ad4fd8f..176ebac 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala @@ -82,8 +82,8 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala private implicit val executionContext = actorSystem.dispatcher - private val invokeRateThrottler = new RateThrottler("actions per minute", config.actionInvokePerMinuteLimit.toInt) - private val triggerRateThrottler = new RateThrottler("triggers per minute", config.triggerFirePerMinuteLimit.toInt) + private val invokeRateThrottler = new RateThrottler("actions per minute", config.actionInvokePerMinuteLimit.toInt, _.limits.invocationsPerMinute) + private val triggerRateThrottler = new RateThrottler("triggers per minute", config.triggerFirePerMinuteLimit.toInt, _.limits.firesPerMinute) private val concurrentInvokeThrottler = new ActivationThrottler(config.consulServer, loadBalancer, config.actionInvokeConcurrentLimit.toInt, config.actionInvokeSystemOverloadLimit.toInt) /** diff --git a/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala index ee32604..acd0230 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala @@ -29,9 +29,9 @@ import whisk.core.entity.UUID * * For now, we throttle only at a 1-minute granularity. */ -class RateThrottler(description: String, maxPerMinute: Int)(implicit logging: Logging) { +class RateThrottler(description: String, defaultMaxPerMinute: Int, overrideMaxPerMinute: Identity => Option[Int])(implicit logging: Logging) { - logging.info(this, s"$description: maxPerMinute = $maxPerMinute")(TransactionId.controller) + logging.info(this, s"$description: defaultMaxPerMinute = $defaultMaxPerMinute")(TransactionId.controller) /** * Maintains map of subject namespace to operations rates. @@ -47,9 +47,10 @@ class RateThrottler(description: String, maxPerMinute: Int)(implicit logging: Lo */ def check(user: Identity)(implicit transid: TransactionId): Boolean = { val uuid = user.uuid // this is namespace identifier - val rate = rateMap.getOrElseUpdate(uuid, new RateInfo(maxPerMinute)) - val belowLimit = rate.check() - logging.debug(this, s"namespace = ${uuid.asString} rate = ${rate.count()}, below limit = $belowLimit") + val rate = rateMap.getOrElseUpdate(uuid, new RateInfo) + val limit = overrideMaxPerMinute(user).getOrElse(defaultMaxPerMinute) + val belowLimit = rate.check(limit) + logging.debug(this, s"namespace = ${uuid.asString} rate = ${rate.count()}, limit = $limit, below limit = $belowLimit") belowLimit } } @@ -57,7 +58,7 @@ class RateThrottler(description: String, maxPerMinute: Int)(implicit logging: Lo /** * Tracks the activation rate of one subject at minute-granularity. */ -private class RateInfo(maxPerMinute: Int) { +private class RateInfo { var lastMin = getCurrentMinute var lastMinCount = 0 @@ -65,9 +66,12 @@ private class RateInfo(maxPerMinute: Int) { /** * Increments operation count in the current time window by - * one and checks if still below allowed max rate. + * one and checks if below allowed max rate. + * + * @param maxPerMinute the current maximum allowed requests + * per minute (might change over time) */ - def check(): Boolean = { + def check(maxPerMinute: Int): Boolean = { roll() lastMinCount = lastMinCount + 1 lastMinCount <= maxPerMinute diff --git a/tests/src/test/scala/limits/ThrottleTests.scala b/tests/src/test/scala/limits/ThrottleTests.scala index 69b32e3..7c37808 100644 --- a/tests/src/test/scala/limits/ThrottleTests.scala +++ b/tests/src/test/scala/limits/ThrottleTests.scala @@ -43,6 +43,8 @@ import spray.json._ import spray.json.DefaultJsonProtocol._ import whisk.http.Messages._ import whisk.utils.ExecutionContextFactory +import org.scalatest.BeforeAndAfterAll +import common.RunWskAdminCmd @RunWith(classOf[JUnitRunner]) class ThrottleTests @@ -278,3 +280,89 @@ class ThrottleTests waitForActivations(combinedResults.par) } } + +@RunWith(classOf[JUnitRunner]) +class NamespaceSpecificThrottleTests + extends FlatSpec + with TestHelpers + with WskTestHelpers + with Matchers + with BeforeAndAfterAll { + + val wskadmin = new RunWskAdminCmd {} + val wsk = new Wsk + + val defaultAction = Some(TestUtils.getTestActionFilename("hello.js")) + + // Create a subject with rate limits == 0 + val zeroProps = getAdditionalTestSubject("zeroSubject") + wskadmin.cli(Seq("limits", "set", zeroProps.namespace, "--invocationsPerMinute", "0", "--firesPerMinute", "0", "--concurrentInvocations", "0")) + + // Create a subject where only the concurrency limit is set to 0 + val zeroConcProps = getAdditionalTestSubject("zeroConcSubject") + wskadmin.cli(Seq("limits", "set", zeroConcProps.namespace, "--concurrentInvocations", "0")) + + // Create a subject where the rate limits are set to 1 + val oneProps = getAdditionalTestSubject("oneSubject") + wskadmin.cli(Seq("limits", "set", oneProps.namespace, "--invocationsPerMinute", "1", "--firesPerMinute", "1")) + + override def afterAll() = { + disposeAdditionalTestSubject(zeroProps.namespace) + disposeAdditionalTestSubject(zeroConcProps.namespace) + disposeAdditionalTestSubject(oneProps.namespace) + } + + behavior of "Namespace-specific throttles" + + it should "respect overridden rate-throttles of 0" in withAssetCleaner(zeroProps) { + (wp, assetHelper) => + implicit val props = wp + val triggerName = "zeroTrigger" + val actionName = "zeroAction" + + assetHelper.withCleaner(wsk.action, actionName) { + (action, _) => action.create(actionName, defaultAction) + } + assetHelper.withCleaner(wsk.trigger, triggerName) { + (trigger, _) => trigger.create(triggerName) + } + + wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should include(tooManyRequests) + wsk.trigger.fire(triggerName, expectedExitCode = TestUtils.THROTTLED).stderr should include(tooManyRequests) + } + + it should "respect overridden rate-throttles of 1" in withAssetCleaner(oneProps) { + (wp, assetHelper) => + implicit val props = wp + val triggerName = "oneTrigger" + val actionName = "oneAction" + + assetHelper.withCleaner(wsk.action, actionName) { + (action, _) => action.create(actionName, defaultAction) + } + assetHelper.withCleaner(wsk.trigger, triggerName) { + (trigger, _) => trigger.create(triggerName) + } + + // One invoke should be allowed, the second one throttled + wsk.action.invoke(actionName) + wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should include(tooManyRequests) + + // One fire should be allowed, the second one throttled + wsk.trigger.fire(triggerName) + wsk.trigger.fire(triggerName, expectedExitCode = TestUtils.THROTTLED).stderr should include(tooManyRequests) + } + + it should "respect overridden concurrent throttle of 0" in withAssetCleaner(zeroConcProps) { + (wp, assetHelper) => + implicit val props = wp + val actionName = "zeroConcurrentAction" + + assetHelper.withCleaner(wsk.action, actionName) { + (action, _) => action.create(actionName, defaultAction) + } + + wsk.action.invoke(actionName, expectedExitCode = TestUtils.THROTTLED).stderr should include(tooManyConcurrentRequests) + } + +} diff --git a/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala b/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala index c79848d..45cbac5 100644 --- a/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala @@ -27,6 +27,7 @@ import org.scalatest.junit.JUnitRunner import common.StreamLogging import whisk.common.TransactionId import whisk.core.entitlement._ +import whisk.core.entity.UserLimits /** * Tests rate throttle. @@ -46,8 +47,8 @@ class RateThrottleTests behavior of "Rate Throttle" it should "throttle when rate exceeds allowed threshold" in { - new RateThrottler("test", 0).check(subject) shouldBe false - val rt = new RateThrottler("test", 1) + new RateThrottler("test", 0, _.limits.invocationsPerMinute).check(subject) shouldBe false + val rt = new RateThrottler("test", 1, _.limits.invocationsPerMinute) rt.check(subject) shouldBe true rt.check(subject) shouldBe false rt.check(subject) shouldBe false @@ -55,4 +56,15 @@ class RateThrottleTests rt.check(subject) shouldBe true } + it should "check against an alternative limit if passed in" in { + val withLimits = subject.copy(limits = UserLimits(invocationsPerMinute = Some(5))) + val rt = new RateThrottler("test", 1, _.limits.invocationsPerMinute) + rt.check(withLimits) shouldBe true // 1 + rt.check(withLimits) shouldBe true // 2 + rt.check(withLimits) shouldBe true // 3 + rt.check(withLimits) shouldBe true // 4 + rt.check(withLimits) shouldBe true // 5 + rt.check(withLimits) shouldBe false + } + } diff --git a/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala b/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala index e198baf..3d46f2b 100644 --- a/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala +++ b/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala @@ -42,7 +42,6 @@ import whisk.core.entity.size.SizeInt import whisk.http.Messages import whisk.utils.JsHelpers - @RunWith(classOf[JUnitRunner]) class SchemaTests extends FlatSpec @@ -84,7 +83,8 @@ class SchemaTests "subject" -> i.subject.asString.toJson, "namespace" -> i.namespace.toJson, "authkey" -> i.authkey.compact.toJson, - "rights" -> Array("READ", "PUT", "DELETE", "ACTIVATE").toJson) + "rights" -> Array("READ", "PUT", "DELETE", "ACTIVATE").toJson, + "limits" -> JsObject()) Identity.serdes.write(i) shouldBe expected Identity.serdes.read(expected) shouldBe i } diff --git a/tools/admin/wskadmin b/tools/admin/wskadmin index c149f04..3110eaf 100755 --- a/tools/admin/wskadmin +++ b/tools/admin/wskadmin @@ -32,6 +32,7 @@ import sys import traceback import uuid import wskprop +import urllib try: import argcomplete except ImportError: @@ -72,7 +73,8 @@ def main(): exitCode = { 'user' : userCmd, 'db' : dbCmd, - 'syslog' : syslogCmd + 'syslog' : syslogCmd, + 'limits': limitsCmd }[args.cmd](args, props) except Exception as e: print('Exception: ', e) @@ -116,6 +118,15 @@ def parseArgs(): subcmd.add_argument('-p', '--pick', metavar='N', help='show no more than N identities', type=int, default=1) subcmd.add_argument('-k', '--key', help='show only the keys', action='store_true') + propmenu = subparsers.add_parser('limits', help='manage namespace-specific limits') + subparser = propmenu.add_subparsers(title='available commands', dest='subcmd') + + subcmd = subparser.add_parser('set', help='set limits for a given namespace') + subcmd.add_argument('namespace', help='the namespace to set limits for') + subcmd.add_argument('--invocationsPerMinute', help='invocations per minute allowed', type=int) + subcmd.add_argument('--firesPerMinute', help='trigger fires per minute allowed', type=int) + subcmd.add_argument('--concurrentInvocations', help='concurrent invocations allowed for this namespace', type=int) + propmenu = subparsers.add_parser('db', help='work with dbs') subparser = propmenu.add_subparsers(title='available commands', dest='subcmd') @@ -169,6 +180,13 @@ def syslogCmd(args, props): print('unknown command') return 2 +def limitsCmd(args, props): + if args.subcmd == 'set': + return setLimitsCmd(args, props) + else: + print('unknown command') + return 2 + def createUserCmd(args, props): subject = args.subject.strip() if len(subject) < 5: @@ -275,6 +293,9 @@ def listUserCmd(args, props): return 1 def getSubjectFromDb(args, props): + return getDocumentFromDb(props, args.subject, args.verbose) + +def getDocumentFromDb(props, doc, verbose): protocol = props[DB_PROTOCOL] host = props[DB_HOST] port = props[DB_PORT] @@ -287,14 +308,14 @@ def getSubjectFromDb(args, props): 'host' : host, 'port' : port, 'database': database, - 'subject' : args.subject + 'subject' : doc } headers = { 'Content-Type': 'application/json', } - res = request('GET', url, headers=headers, auth='%s:%s' % (username, password), verbose=args.verbose) + res = request('GET', url, headers=headers, auth='%s:%s' % (username, password), verbose=verbose) if res.status == 200: doc = json.loads(res.read()) return (doc, res) @@ -459,6 +480,26 @@ def unblockUserCmd(args, props): print('Failed to get subject (%s)' % res.read().strip()) return 1 +def setLimitsCmd(args, props): + argsDict = vars(args) + docId = args.namespace + "/limits" + (dbDoc, res) = getDocumentFromDb(props, urllib.quote_plus(docId), args.verbose) + doc = dbDoc or {'_id': docId} + + limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations'] + for limit in limits: + givenLimit = argsDict.get(limit) + toSet = givenLimit if givenLimit != None else doc.get(limit) + if toSet != None: + doc[limit] = toSet + + res = insertIntoDatabase(props, doc, args.verbose) + if res.status in [201, 202]: + print('Limits successfully set for "%s"' % args.namespace) + else: + print('Failed to set limits (%s)' % res.read().strip()) + return 1 + def getDbCmd(args, props): protocol = props[DB_PROTOCOL] host = props[DB_HOST] -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].