This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 7eb7079 Adding internal table-api config for table-api rate limits
(#1431)
7eb7079 is described below
commit 7eb7079e0016eac39c5a923fef7e306668418ad1
Author: FAN XIE <[email protected]>
AuthorDate: Mon Sep 21 12:19:09 2020 -0700
Adding internal table-api config for table-api rate limits (#1431)
* Adding internal table-api config for table-api rate limits
* address comments
* style
* style
---
.../table/descriptors/RemoteTableDescriptor.java | 57 ++++++++++++++--------
.../descriptors/TestRemoteTableDescriptor.java | 11 +++++
2 files changed, 47 insertions(+), 21 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 23234cd..4c45dde 100644
---
a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++
b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -70,6 +70,10 @@ public class RemoteTableDescriptor<K, V> extends
BaseTableDescriptor<K, V, Remot
public static final String READ_FN = "io.read.func";
public static final String WRITE_FN = "io.write.func";
public static final String RATE_LIMITER = "io.ratelimiter";
+ //Key name for table api read rate limit
+ public static final String READ_CREDITS = "io.read.credits";
+ //Key name for table api write rate limit
+ public static final String WRITE_CREDITS = "io.write.credits";
public static final String READ_CREDIT_FN = "io.read.credit.func";
public static final String WRITE_CREDIT_FN = "io.write.credit.func";
public static final String ASYNC_CALLBACK_POOL_SIZE =
"io.async.callback.pool.size";
@@ -283,27 +287,7 @@ public class RemoteTableDescriptor<K, V> extends
BaseTableDescriptor<K, V, Remot
Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
- // Handle rate limiter
- if (!tagCreditsMap.isEmpty()) {
- RateLimiter defaultRateLimiter;
- try {
- @SuppressWarnings("unchecked")
- Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>)
Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
- Constructor<? extends RateLimiter> ctor =
clazz.getConstructor(Map.class);
- defaultRateLimiter = ctor.newInstance(tagCreditsMap);
- } catch (Exception ex) {
- throw new SamzaException("Failed to create default rate limiter", ex);
- }
- addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter",
defaultRateLimiter), tableConfig);
- if (defaultRateLimiter instanceof TablePart) {
- addTablePartConfig(RATE_LIMITER, (TablePart) defaultRateLimiter,
jobConfig, tableConfig);
- }
- } else if (rateLimiter != null) {
- addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter",
rateLimiter), tableConfig);
- if (rateLimiter instanceof TablePart) {
- addTablePartConfig(RATE_LIMITER, (TablePart) rateLimiter, jobConfig,
tableConfig);
- }
- }
+ writeRateLimiterConfig(jobConfig, tableConfig);
// Handle readCredit functions
if (readCreditFn != null) {
@@ -350,6 +334,37 @@ public class RemoteTableDescriptor<K, V> extends
BaseTableDescriptor<K, V, Remot
return Collections.unmodifiableMap(tableConfig);
}
+ // Handle rate limiter
+ private void writeRateLimiterConfig(Config jobConfig, Map<String, String>
tableConfig) {
+ if (!tagCreditsMap.isEmpty()) {
+ RateLimiter defaultRateLimiter;
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>)
Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
+ Constructor<? extends RateLimiter> ctor =
clazz.getConstructor(Map.class);
+ defaultRateLimiter = ctor.newInstance(tagCreditsMap);
+ } catch (Exception ex) {
+ throw new SamzaException("Failed to create default rate limiter", ex);
+ }
+ addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter",
defaultRateLimiter), tableConfig);
+ if (defaultRateLimiter instanceof TablePart) {
+ addTablePartConfig(RATE_LIMITER, (TablePart) defaultRateLimiter,
jobConfig, tableConfig);
+ }
+ } else if (rateLimiter != null) {
+ addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter",
rateLimiter), tableConfig);
+ if (rateLimiter instanceof TablePart) {
+ addTablePartConfig(RATE_LIMITER, (TablePart) rateLimiter, jobConfig,
tableConfig);
+ }
+ }
+ //Write table api read/write rate limit
+ if (this.enableReadRateLimiter && tagCreditsMap.containsKey(RL_READ_TAG)) {
+ addTableConfig(READ_CREDITS,
String.valueOf(tagCreditsMap.get(RL_READ_TAG)), tableConfig);
+ }
+ if (this.enableWriteRateLimiter &&
tagCreditsMap.containsKey(RL_WRITE_TAG)) {
+ addTableConfig(WRITE_CREDITS,
String.valueOf(tagCreditsMap.get(RL_WRITE_TAG)), tableConfig);
+ }
+ }
+
@Override
protected void validate() {
Preconditions.checkArgument(writeFn != null || readFn != null,
diff --git
a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index cf03599..18f2582 100644
---
a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++
b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -254,6 +254,17 @@ public class TestRemoteTableDescriptor {
"{\"exponentialFactor\":0.0,\"backoffType\":\"NONE\",\"retryPredicate\":{}}");
}
+ @Test
+ public void testReadWriteRateLimitToConfig() {
+ Map<String, String> tableConfig = new
RemoteTableDescriptor("1").withReadFunction(createMockTableReadFunction())
+ .withReadRetryPolicy(new TableRetryPolicy())
+ .withWriteRateLimit(1000)
+ .withReadRateLimit(2000)
+ .toConfig(new MapConfig());
+ Assert.assertEquals(String.valueOf(2000),
tableConfig.get("tables.1.io.read.credits"));
+ Assert.assertEquals(String.valueOf(1000),
tableConfig.get("tables.1.io.write.credits"));
+ }
+
private Context createMockContext(TableDescriptor tableDescriptor) {
Context context = mock(Context.class);