This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push:
new 8af91d1 Add a limiter so we can throttle access
new a4051cd Merge pull request #324 from
atoulme/add_concurrent_requests_throttling
8af91d1 is described below
commit 8af91d125bd01a4a7a184f20262d30a665856d43
Author: Antoine Toulme <[email protected]>
AuthorDate: Thu Jul 29 14:55:38 2021 -0700
Add a limiter so we can throttle access
---
dependency-versions.gradle | 1 +
jsonrpc/build.gradle | 1 +
.../tuweni/jsonrpc/methods/MethodsHandler.kt | 26 ++++++++++++++
.../tuweni/jsonrpc/methods/MethodsHandlerTest.kt | 42 ++++++++++++++++++++++
4 files changed, 70 insertions(+)
diff --git a/dependency-versions.gradle b/dependency-versions.gradle
index 6f08571..a3841a9 100644
--- a/dependency-versions.gradle
+++ b/dependency-versions.gradle
@@ -24,6 +24,7 @@ dependencyManagement {
dependency('com.google.guava:guava:27.0.1-jre')
dependency('com.h2database:h2:1.4.197')
dependency('com.jolbox:bonecp:0.8.0.RELEASE')
+ dependency('com.netflix.concurrency-limits:concurrency-limits-core:0.3.6')
dependency('com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0')
dependency('com.opentable.components:otj-pg-embedded:0.13.3')
dependency('com.squareup.okhttp3:okhttp:3.12.0')
diff --git a/jsonrpc/build.gradle b/jsonrpc/build.gradle
index e2d2da2..e09379c 100644
--- a/jsonrpc/build.gradle
+++ b/jsonrpc/build.gradle
@@ -16,6 +16,7 @@ dependencies {
implementation 'org.slf4j:slf4j-api'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation "com.google.guava:guava"
+ implementation 'com.netflix.concurrency-limits:concurrency-limits-core'
implementation "org.jetbrains.kotlin:kotlin-stdlib"
implementation 'io.opentelemetry:opentelemetry-api-metrics'
implementation 'io.opentelemetry:opentelemetry-sdk-metrics'
diff --git
a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt
b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt
index 9dc92a0..3419b98 100644
---
a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt
+++
b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt
@@ -16,6 +16,8 @@
*/
package org.apache.tuweni.jsonrpc.methods
+import com.netflix.concurrency.limits.limit.FixedLimit
+import com.netflix.concurrency.limits.limiter.SimpleLimiter
import io.opentelemetry.api.metrics.LongCounter
import io.opentelemetry.api.metrics.common.Labels
import org.apache.tuweni.eth.JSONRPCError
@@ -66,6 +68,30 @@ class MethodAllowListHandler(private val allowedMethods:
List<String>, private v
}
}
+class ThrottlingHandler(private val threshold: Int, private val
delegateHandler: (JSONRPCRequest) -> JSONRPCResponse) {
+
+ private val limiter: SimpleLimiter<Void> = SimpleLimiter
+ .newBuilder()
+ .limit(FixedLimit.of(threshold))
+ .build()
+
+ fun handleRequest(request: JSONRPCRequest): JSONRPCResponse {
+ val listener = limiter.acquire(null)
+ if (listener.isEmpty) {
+ return JSONRPCResponse(id = request.id, error = JSONRPCError(code =
-32000, message = "Too many requests"))
+ } else {
+ try {
+ val response = delegateHandler(request)
+ listener.get().onSuccess()
+ return response
+ } catch (t: Throwable) {
+ listener.get().onDropped()
+ throw RuntimeException(t)
+ }
+ }
+ }
+}
+
// TODO DelegateHandler - choose from a number of handlers to see which to
delegate to.
// TODO FilterHandler - filter incoming requests per allowlist
// TODO CachingHandler - cache some incoming requests
diff --git
a/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt
b/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt
index c9778f8..9441a10 100644
---
a/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt
+++
b/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt
@@ -20,6 +20,9 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.IntervalMetricReader
import io.opentelemetry.sdk.metrics.export.MetricProducer
import io.opentelemetry.sdk.metrics.testing.InMemoryMetricExporter
+import kotlinx.coroutines.async
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
import org.apache.tuweni.eth.JSONRPCError
import org.apache.tuweni.eth.JSONRPCRequest
import org.apache.tuweni.eth.JSONRPCResponse
@@ -123,3 +126,42 @@ class MethodAllowListHandlerTest {
assertEquals("Method not enabled", respContents.message)
}
}
+
+class ThrottlingHandlerTest {
+
+ @Test
+ fun testThrottling(): Unit = runBlocking {
+ val handler = ThrottlingHandler(4) {
+ runBlocking {
+ delay(500)
+ JSONRPCResponse(id = 1)
+ }
+ }
+ async {
+ val response = handler.handleRequest(JSONRPCRequest(2, "foo", arrayOf()))
+ assertEquals(1, response.id)
+ }
+ async {
+ val response = handler.handleRequest(JSONRPCRequest(3, "foo", arrayOf()))
+ assertEquals(1, response.id)
+ }
+ async {
+ val response = handler.handleRequest(JSONRPCRequest(4, "foo", arrayOf()))
+ assertEquals(1, response.id)
+ }
+ async {
+ val response = handler.handleRequest(JSONRPCRequest(5, "foo", arrayOf()))
+ assertEquals(1, response.id)
+ }
+ async {
+ delay(200)
+ val response = handler.handleRequest(JSONRPCRequest(6, "foo", arrayOf()))
+ assertEquals(-32000, response.error?.code)
+ }
+ async {
+ delay(1000)
+ val response = handler.handleRequest(JSONRPCRequest(7, "foo", arrayOf()))
+ assertEquals(1, response.id)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]