jugomezv commented on code in PR #9201:
URL: https://github.com/apache/pinot/pull/9201#discussion_r951930928


##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java:
##########
@@ -116,4 +121,40 @@ public void testBuildCache() throws Exception {
     assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // use cache
     verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_C);
   }
+
+  @Test
+  public void testMetricEmitter() {
+    // setup metric emitter
+    double rateLimit = 2; // 2 msgs/sec = 120 msgs/min
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    MetricEmitter metricEmitter = new MetricEmitter(serverMetrics, 
"tableA-topicB-partition5");
+
+    // 1st minute: no metrics should be emitted in the first minute
+    Instant now = Clock.fixed(Instant.parse("2022-08-10T12:00:02Z"), 
ZoneOffset.UTC).instant();
+    assertEquals(metricEmitter.emitMetric(10, rateLimit, now), 0);
+    now = Clock.fixed(Instant.parse("2022-08-10T12:00:10Z"), 
ZoneOffset.UTC).instant();
+    assertEquals(metricEmitter.emitMetric(20, rateLimit, now), 0);
+    now = Clock.fixed(Instant.parse("2022-08-10T12:00:30Z"), 
ZoneOffset.UTC).instant();
+    assertEquals(metricEmitter.emitMetric(5, rateLimit, now), 0);
+    now = Clock.fixed(Instant.parse("2022-08-10T12:00:55Z"), 
ZoneOffset.UTC).instant();
+    assertEquals(metricEmitter.emitMetric(25, rateLimit, now), 0);
+
+    // 2nd minute: metric should be emitted
+    now = Clock.fixed(Instant.parse("2022-08-10T12:01:05Z"), 
ZoneOffset.UTC).instant();
+    assertEquals(metricEmitter.emitMetric(35, rateLimit, now), (int) 
Math.round((10 + 20 + 5 + 25) / 120.0 * 100));

Review Comment:
   can we have a comment to explain this calculation?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java:
##########
@@ -69,7 +73,8 @@ public void enableThrottling() {
     _isThrottlingAllowed = true;
   }
 
-  public ConsumptionRateLimiter createRateLimiter(StreamConfig streamConfig, 
String tableName) {
+  public ConsumptionRateLimiter createRateLimiter(StreamConfig streamConfig, 
String tableName,

Review Comment:
   Can we overload this method and then call createRateLimiter(streamConfig, 
tableName, null, null) from the createRateLimiter(streamConfig, tabelName)? 
that reduces the touch points of the change as old invocations remain the same



##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java:
##########
@@ -42,7 +42,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   // Upsert metrics
   UPSERT_PRIMARY_KEYS_COUNT("upsertPrimaryKeysCount", false),
   // Dedup metrics
-  DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false);
+  DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
+  CONSUMPTION_RATE_TO_LIMIT_RATIO_PERCENT("ratio", false);

Review Comment:
   This name is a bit confusing: is a rate or a ratio? can we have a more 
telling name?



##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java:
##########
@@ -116,4 +121,40 @@ public void testBuildCache() throws Exception {
     assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // use cache
     verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_C);
   }
+
+  @Test
+  public void testMetricEmitter() {
+    // setup metric emitter
+    double rateLimit = 2; // 2 msgs/sec = 120 msgs/min
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    MetricEmitter metricEmitter = new MetricEmitter(serverMetrics, 
"tableA-topicB-partition5");
+
+    // 1st minute: no metrics should be emitted in the first minute
+    Instant now = Clock.fixed(Instant.parse("2022-08-10T12:00:02Z"), 
ZoneOffset.UTC).instant();
+    assertEquals(metricEmitter.emitMetric(10, rateLimit, now), 0);
+    now = Clock.fixed(Instant.parse("2022-08-10T12:00:10Z"), 
ZoneOffset.UTC).instant();
+    assertEquals(metricEmitter.emitMetric(20, rateLimit, now), 0);
+    now = Clock.fixed(Instant.parse("2022-08-10T12:00:30Z"), 
ZoneOffset.UTC).instant();
+    assertEquals(metricEmitter.emitMetric(5, rateLimit, now), 0);
+    now = Clock.fixed(Instant.parse("2022-08-10T12:00:55Z"), 
ZoneOffset.UTC).instant();
+    assertEquals(metricEmitter.emitMetric(25, rateLimit, now), 0);
+
+    // 2nd minute: metric should be emitted
+    now = Clock.fixed(Instant.parse("2022-08-10T12:01:05Z"), 
ZoneOffset.UTC).instant();
+    assertEquals(metricEmitter.emitMetric(35, rateLimit, now), (int) 
Math.round((10 + 20 + 5 + 25) / 120.0 * 100));

Review Comment:
   I would suggest to use constants and name them startTime, 
startTimeplusFiveSec etc, that will make code very easy to read, this also 
avoids the need of comments like 3rd minute, 4th minute etc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to