sajjad-moradi commented on code in PR #9201:
URL: https://github.com/apache/pinot/pull/9201#discussion_r951967422
##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java:
##########
@@ -116,4 +121,40 @@ public void testBuildCache() throws Exception {
assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // use cache
verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_C);
}
+
+ @Test
+ public void testMetricEmitter() {
+ // setup metric emitter
+ double rateLimit = 2; // 2 msgs/sec = 120 msgs/min
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ MetricEmitter metricEmitter = new MetricEmitter(serverMetrics,
"tableA-topicB-partition5");
+
+ // 1st minute: no metrics should be emitted in the first minute
+ Instant now = Clock.fixed(Instant.parse("2022-08-10T12:00:02Z"),
ZoneOffset.UTC).instant();
+ assertEquals(metricEmitter.emitMetric(10, rateLimit, now), 0);
+ now = Clock.fixed(Instant.parse("2022-08-10T12:00:10Z"),
ZoneOffset.UTC).instant();
+ assertEquals(metricEmitter.emitMetric(20, rateLimit, now), 0);
+ now = Clock.fixed(Instant.parse("2022-08-10T12:00:30Z"),
ZoneOffset.UTC).instant();
+ assertEquals(metricEmitter.emitMetric(5, rateLimit, now), 0);
+ now = Clock.fixed(Instant.parse("2022-08-10T12:00:55Z"),
ZoneOffset.UTC).instant();
+ assertEquals(metricEmitter.emitMetric(25, rateLimit, now), 0);
+
+ // 2nd minute: metric should be emitted
+ now = Clock.fixed(Instant.parse("2022-08-10T12:01:05Z"),
ZoneOffset.UTC).instant();
+ assertEquals(metricEmitter.emitMetric(35, rateLimit, now), (int)
Math.round((10 + 20 + 5 + 25) / 120.0 * 100));
Review Comment:
I thought having the numbers there shows how the calculation is done, but
apparently it wasn't clear enough! Did refactor to better explain the numbers,
however I kept the separation of what happens in each minute so it's more clear
what we want to achieve there.
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java:
##########
@@ -42,7 +42,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
// Upsert metrics
UPSERT_PRIMARY_KEYS_COUNT("upsertPrimaryKeysCount", false),
// Dedup metrics
- DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false);
+ DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
+ CONSUMPTION_RATE_TO_LIMIT_RATIO_PERCENT("ratio", false);
Review Comment:
It's the ratio of "consumption rate" over "consumption rate limit" in terms
of percentage. For example, if consumption rate is 45 and consumption rate
limit is 50, the ratio_percentage will be 90%.
Let me know if you can come up with a better name.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java:
##########
@@ -69,7 +73,8 @@ public void enableThrottling() {
_isThrottlingAllowed = true;
}
- public ConsumptionRateLimiter createRateLimiter(StreamConfig streamConfig,
String tableName) {
+ public ConsumptionRateLimiter createRateLimiter(StreamConfig streamConfig,
String tableName,
Review Comment:
I didn't create one because the new overloaded method will be only used in
test. That being said, your point is valid and we should treat test code as
production code. Just updated the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]