Copilot commented on code in PR #18904:
URL: https://github.com/apache/pinot/pull/18904#discussion_r3509414371
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -198,11 +196,13 @@ public StreamConfig(String tableNameWithType, Map<String,
String> streamConfigMa
}
_flushAutotuneInitialRows = autotuneInitialRows > 0 ? autotuneInitialRows
: DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS;
- String groupIdKey = StreamConfigProperties.constructStreamProperty(_type,
StreamConfigProperties.GROUP_ID);
- _groupId = streamConfigMap.get(groupIdKey);
+ String partitionRate =
streamConfigMap.get(StreamConfigProperties.PARTITION_CONSUMPTION_RATE_LIMIT);
+ _partitionConsumptionRateLimit =
+ partitionRate != null ? Double.parseDouble(partitionRate) :
CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED;
Review Comment:
The new partition/topic rate limit parsing uses Double.parseDouble() without
adding config-key context and will also accept non-finite values like
NaN/Infinity. A malformed value would fail with an unhelpful
NumberFormatException (or worse, silently disable throttling for NaN partition
limits). Consider validating the value and throwing an IllegalArgumentException
that includes the config key and value.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java:
##########
@@ -45,8 +45,7 @@ private StreamConfigProperties() {
public static final String STREAM_IDLE_TIMEOUT_MILLIS =
"idle.timeout.millis";
public static final String STREAM_DECODER_CLASS = "decoder.class.name";
public static final String DECODER_PROPS_PREFIX = "decoder.prop";
- public static final String GROUP_ID = "hlc.group.id";
- public static final String PARTITION_MSG_OFFSET_FACTORY_CLASS =
"partition.offset.factory.class.name";
+ public static final String PARTITION_CONSUMPTION_RATE_LIMIT =
"partition.consumption.rate.limit";
public static final String TOPIC_CONSUMPTION_RATE_LIMIT =
"topic.consumption.rate.limit";
Review Comment:
Removing public constants from pinot-spi is a binary-incompatible change for
third-party plugins that might reference these keys directly. Consider keeping
the legacy keys as `@Deprecated` constants (even if Pinot no longer uses them)
to preserve compatibility while still steering users to newer configs.
##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java:
##########
@@ -91,22 +96,30 @@ public class RealtimeConsumptionRateManagerTest {
@Test
public void testCreateRateLimiter() {
// topic A
- ConsumptionRateLimiter rateLimiter =
_consumptionRateManager.createRateLimiter(STREAM_CONFIG_A, TABLE_NAME);
- assertEquals(5.0, ((PartitionRateLimiter) rateLimiter).getRate(), DELTA);
+ ConsumptionRateLimiter rateLimiter =
CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_A, TABLE_NAME);
+ assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 5.0, DELTA);
// topic B
- rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_B,
TABLE_NAME);
- assertEquals(2.5, ((PartitionRateLimiter) rateLimiter).getRate(), DELTA);
+ rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_B,
TABLE_NAME);
+ assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 2.5, DELTA);
// topic C
- rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_C,
TABLE_NAME);
+ rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_C,
TABLE_NAME);
assertEquals(rateLimiter, NOOP_RATE_LIMITER);
+
+ // topic D: partition level rate limit is used directly, without fetching
the partition count
+ rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_D,
TABLE_NAME);
+ assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 4.0, DELTA);
+
+ // topic E: partition level rate limit takes precedence over topic level
rate limit
+ rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_E,
TABLE_NAME);
+ assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 4.0, DELTA);
Review Comment:
The new partition-level rate limit tests describe that the
cache/partition-count fetch is skipped, but they don't currently assert that
behavior. Adding a verify(times(0)) on the cache helps prevent regressions
where partition-level throttling accidentally triggers a partition-count lookup.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -413,13 +413,14 @@ public int getFlushAutotuneInitialRows() {
return _flushAutotuneInitialRows;
}
- public String getGroupId() {
- return _groupId;
+ /// Returns the partition level consumption rate limit. Non-positive value
means consumption is not throttled.
+ public double getPartitionConsumptionRateLimit() {
+ return _partitionConsumptionRateLimit;
}
- public Optional<Double> getTopicConsumptionRateLimit() {
- return _topicConsumptionRateLimit == CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED
? Optional.empty()
- : Optional.of(_topicConsumptionRateLimit);
+ /// Returns the topic level consumption rate limit. Non-positive value means
consumption is not throttled.
+ public double getTopicConsumptionRateLimit() {
+ return _topicConsumptionRateLimit;
Review Comment:
Changing getTopicConsumptionRateLimit() from Optional<Double> to double (and
removing getGroupId()) in pinot-spi is a binary-incompatible API change for any
external plugins compiled against the old signature. If the intent is to avoid
a breaking change, consider keeping the old methods (deprecated) and
introducing new methods with different names (e.g.
getTopicConsumptionRateLimitValue()/getPartitionConsumptionRateLimitValue()),
then migrate internal call sites first and remove the deprecated methods in a
later major release.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]