This is an automated email from the ASF dual-hosted git repository.

Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ee1b425328 Add config to set consumption rate limit at partition 
level (#18904)
7ee1b425328 is described below

commit 7ee1b425328956c058c6bf7a57eea9e2b9f12061
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jul 1 17:00:23 2026 -0700

    Add config to set consumption rate limit at partition level (#18904)
---
 .../realtime/RealtimeConsumptionRateManager.java   | 36 +++++----
 .../RealtimeConsumptionRateManagerTest.java        | 51 +++++++-----
 .../org/apache/pinot/spi/stream/StreamConfig.java  | 90 +++++++++++++---------
 .../pinot/spi/stream/StreamConfigProperties.java   |  3 +-
 4 files changed, 109 insertions(+), 71 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
index b050122c03c..56d54ffdd3b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
@@ -48,17 +48,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-/**
- * This class is responsible for creating realtime consumption rate limiters.
- * It contains one rate limiter for the entire server and multiple table 
partition level rate limiters.
- * Server rate limiter is used to throttle the overall consumption rate of the 
server and configured via
- * cluster or server config.
- * For table partition level rate limiter, the rate limit value specified in 
StreamConfig of table config, is for the
- * entire topic. The effective rate limit for each partition is simply the 
specified rate limit divided by the
- * partition count.
- * This class leverages a cache for storing partition count for different 
topics as retrieving partition count from
- * stream is a bit expensive and also the same count will be used of all 
partition consumers of the same topic.
- */
+/// This class is responsible for creating realtime consumption rate limiters.
+/// It contains one rate limiter for the entire server and multiple table 
partition level rate limiters.
+/// Server rate limiter is used to throttle the overall consumption rate of 
the server and configured via cluster or
+/// server config.
+/// For table partition level rate limiter, the rate limit can be specified in 
[StreamConfig] of table config in two
+/// ways:
+/// - Partition level rate limit: directly used as the rate limit for each 
partition. It doesn't need to be adjusted
+///   when the partition count of the topic changes.
+/// - Topic level rate limit: the rate limit for the entire topic. The 
effective rate limit for each partition is the
+///   specified rate limit divided by the partition count. When both are 
specified, partition level rate limit takes
+///   precedence.
+/// This class leverages a cache for storing partition count for different 
topics as retrieving partition count from
+/// stream is a bit expensive and also the same count will be used of all 
partition consumers of the same topic.
 public class RealtimeConsumptionRateManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeConsumptionRateManager.class);
   private static final int CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES = 10;
@@ -171,7 +173,14 @@ public class RealtimeConsumptionRateManager {
 
   public ConsumptionRateLimiter createRateLimiter(StreamConfig streamConfig, 
String tableName,
       ServerMetrics serverMetrics, String metricKeyName) {
-    if (streamConfig.getTopicConsumptionRateLimit().isEmpty()) {
+    double partitionRateLimit = 
streamConfig.getPartitionConsumptionRateLimit();
+    if (partitionRateLimit > 0) {
+      LOGGER.info("A consumption rate limiter is set up for topic {} in table 
{} with partition rate limit: {}",
+          streamConfig.getTopicName(), tableName, partitionRateLimit);
+      return new PartitionRateLimiter(partitionRateLimit, serverMetrics, 
metricKeyName);
+    }
+    double topicRateLimit = streamConfig.getTopicConsumptionRateLimit();
+    if (topicRateLimit <= 0) {
       return NOOP_RATE_LIMITER;
     }
     int partitionCount;
@@ -181,8 +190,7 @@ public class RealtimeConsumptionRateManager {
       // Exception here means for some reason, partition count cannot be 
fetched from stream!
       throw new RuntimeException(e);
     }
-    double topicRateLimit = streamConfig.getTopicConsumptionRateLimit().get();
-    double partitionRateLimit = topicRateLimit / partitionCount;
+    partitionRateLimit = topicRateLimit / partitionCount;
     LOGGER.info("A consumption rate limiter is set up for topic {} in table {} 
with rate limit: {} "
             + "(topic rate limit: {}, partition count: {})", 
streamConfig.getTopicName(), tableName, partitionRateLimit,
         topicRateLimit, partitionCount);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
index 3c56c717fbe..bcc40a7d9ca 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
@@ -23,7 +23,6 @@ import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneOffset;
 import java.util.Arrays;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -45,17 +44,20 @@ import static org.testng.Assert.assertEquals;
 public class RealtimeConsumptionRateManagerTest {
   private static final int NUM_PARTITIONS_TOPIC_A = 10;
   private static final int NUM_PARTITIONS_TOPIC_B = 20;
-  private static final Double RATE_LIMIT_FOR_ENTIRE_TOPIC = 50.0;
+  private static final double RATE_LIMIT_FOR_PARTITION = 4.0;
+  private static final double RATE_LIMIT_FOR_ENTIRE_TOPIC = 50.0;
   private static final String TABLE_NAME = "table-XYZ";
   private static final double DELTA = 0.0001;
   private static final StreamConfig STREAM_CONFIG_A = mock(StreamConfig.class);
   private static final StreamConfig STREAM_CONFIG_B = mock(StreamConfig.class);
   private static final StreamConfig STREAM_CONFIG_C = mock(StreamConfig.class);
+  private static final StreamConfig STREAM_CONFIG_D = mock(StreamConfig.class);
+  private static final StreamConfig STREAM_CONFIG_E = mock(StreamConfig.class);
   private static final PinotConfiguration SERVER_CONFIG_1 = 
mock(PinotConfiguration.class);
   private static final PinotConfiguration SERVER_CONFIG_2 = 
mock(PinotConfiguration.class);
   private static final PinotConfiguration SERVER_CONFIG_3 = 
mock(PinotConfiguration.class);
   private static final PinotConfiguration SERVER_CONFIG_4 = 
mock(PinotConfiguration.class);
-  private static RealtimeConsumptionRateManager _consumptionRateManager;
+  private static final RealtimeConsumptionRateManager CONSUMPTION_RATE_MANAGER;
 
   static {
     LoadingCache<StreamConfig, Integer> cache = mock(LoadingCache.class);
@@ -63,12 +65,15 @@ public class RealtimeConsumptionRateManagerTest {
       when(cache.get(STREAM_CONFIG_A)).thenReturn(NUM_PARTITIONS_TOPIC_A);
       when(cache.get(STREAM_CONFIG_B)).thenReturn(NUM_PARTITIONS_TOPIC_B);
     } catch (ExecutionException e) {
-      e.printStackTrace();
+      throw new RuntimeException(e);
     }
-    
when(STREAM_CONFIG_A.getTopicConsumptionRateLimit()).thenReturn(Optional.of(RATE_LIMIT_FOR_ENTIRE_TOPIC));
-    
when(STREAM_CONFIG_B.getTopicConsumptionRateLimit()).thenReturn(Optional.of(RATE_LIMIT_FOR_ENTIRE_TOPIC));
-    
when(STREAM_CONFIG_C.getTopicConsumptionRateLimit()).thenReturn(Optional.empty());
-    _consumptionRateManager = new RealtimeConsumptionRateManager(cache);
+    
when(STREAM_CONFIG_A.getTopicConsumptionRateLimit()).thenReturn(RATE_LIMIT_FOR_ENTIRE_TOPIC);
+    
when(STREAM_CONFIG_B.getTopicConsumptionRateLimit()).thenReturn(RATE_LIMIT_FOR_ENTIRE_TOPIC);
+    
when(STREAM_CONFIG_C.getTopicConsumptionRateLimit()).thenReturn(StreamConfig.CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED);
+    
when(STREAM_CONFIG_D.getPartitionConsumptionRateLimit()).thenReturn(RATE_LIMIT_FOR_PARTITION);
+    
when(STREAM_CONFIG_E.getPartitionConsumptionRateLimit()).thenReturn(RATE_LIMIT_FOR_PARTITION);
+    
when(STREAM_CONFIG_E.getTopicConsumptionRateLimit()).thenReturn(RATE_LIMIT_FOR_ENTIRE_TOPIC);
+    CONSUMPTION_RATE_MANAGER = new RealtimeConsumptionRateManager(cache);
 
     
when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
         
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
@@ -91,22 +96,30 @@ public class RealtimeConsumptionRateManagerTest {
   @Test
   public void testCreateRateLimiter() {
     // topic A
-    ConsumptionRateLimiter rateLimiter = 
_consumptionRateManager.createRateLimiter(STREAM_CONFIG_A, TABLE_NAME);
-    assertEquals(5.0, ((PartitionRateLimiter) rateLimiter).getRate(), DELTA);
+    ConsumptionRateLimiter rateLimiter = 
CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_A, TABLE_NAME);
+    assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 5.0, DELTA);
 
     // topic B
-    rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_B, 
TABLE_NAME);
-    assertEquals(2.5, ((PartitionRateLimiter) rateLimiter).getRate(), DELTA);
+    rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_B, 
TABLE_NAME);
+    assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 2.5, DELTA);
 
     // topic C
-    rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_C, 
TABLE_NAME);
+    rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_C, 
TABLE_NAME);
     assertEquals(rateLimiter, NOOP_RATE_LIMITER);
+
+    // topic D: partition level rate limit is used directly, without fetching 
the partition count
+    rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_D, 
TABLE_NAME);
+    assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 4.0, DELTA);
+
+    // topic E: partition level rate limit takes precedence over topic level 
rate limit
+    rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_E, 
TABLE_NAME);
+    assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 4.0, DELTA);
   }
 
   @Test
   public void testCreateServerRateLimiter() {
     // Server config 1
-    ConsumptionRateLimiter rateLimiter = 
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_1, null);
+    ConsumptionRateLimiter rateLimiter = 
CONSUMPTION_RATE_MANAGER.createServerRateLimiter(SERVER_CONFIG_1, null);
     ServerRateLimiter serverRateLimiter = (ServerRateLimiter) rateLimiter;
     try {
       assertEquals(serverRateLimiter.getRate(), 5.0, DELTA);
@@ -116,7 +129,7 @@ public class RealtimeConsumptionRateManagerTest {
     }
 
     // Server config 2
-    serverRateLimiter = (ServerRateLimiter) 
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_2, null);
+    serverRateLimiter = (ServerRateLimiter) 
CONSUMPTION_RATE_MANAGER.createServerRateLimiter(SERVER_CONFIG_2, null);
     try {
       assertEquals(((ServerRateLimiter) rateLimiter).getRate(), 2.5, DELTA);
       assertEquals(serverRateLimiter.getRate(), 2.5, DELTA);
@@ -125,16 +138,16 @@ public class RealtimeConsumptionRateManagerTest {
     }
 
     // Server config 3
-    rateLimiter = 
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_3, null);
+    rateLimiter = 
CONSUMPTION_RATE_MANAGER.createServerRateLimiter(SERVER_CONFIG_3, null);
     assertEquals(rateLimiter, NOOP_RATE_LIMITER);
 
     // Server config 4
-    rateLimiter = 
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_4, null);
+    rateLimiter = 
CONSUMPTION_RATE_MANAGER.createServerRateLimiter(SERVER_CONFIG_4, null);
     assertEquals(rateLimiter, NOOP_RATE_LIMITER);
 
     ServerRateLimitConfig serverRateLimitConfig = new ServerRateLimitConfig(1, 
MessageCountThrottlingStrategy.INSTANCE);
-    _consumptionRateManager.updateServerRateLimiter(serverRateLimitConfig, 
null);
-    serverRateLimiter = (ServerRateLimiter) 
_consumptionRateManager.getServerRateLimiter();
+    CONSUMPTION_RATE_MANAGER.updateServerRateLimiter(serverRateLimitConfig, 
null);
+    serverRateLimiter = (ServerRateLimiter) 
CONSUMPTION_RATE_MANAGER.getServerRateLimiter();
     try {
       assertEquals(serverRateLimiter.getRate(), 1);
       assertEquals(serverRateLimiter.getMetricEmitter().getRate(), 1);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 7f502fd23f3..60e6b1d025f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.utils.DataSizeUtils;
@@ -50,7 +49,7 @@ public class StreamConfig {
   public static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000;
   public static final int DEFAULT_IDLE_TIMEOUT_MILLIS = 3 * 60 * 1000;
 
-  private static final double CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED = -1;
+  public static final double CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED = -1;
 
   private final String _type;
   private final String _topicName;
@@ -71,8 +70,7 @@ public class StreamConfig {
   private final double _flushThresholdVarianceFraction;
   private final int _flushAutotuneInitialRows; // initial num rows to use for 
SegmentSizeBasedFlushThresholdUpdater
 
-  private final String _groupId;
-
+  private final double _partitionConsumptionRateLimit;
   private final double _topicConsumptionRateLimit;
 
   private final boolean _enableOffsetAutoReset;
@@ -198,11 +196,13 @@ public class StreamConfig {
     }
     _flushAutotuneInitialRows = autotuneInitialRows > 0 ? autotuneInitialRows 
: DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS;
 
-    String groupIdKey = StreamConfigProperties.constructStreamProperty(_type, 
StreamConfigProperties.GROUP_ID);
-    _groupId = streamConfigMap.get(groupIdKey);
+    String partitionRate = 
streamConfigMap.get(StreamConfigProperties.PARTITION_CONSUMPTION_RATE_LIMIT);
+    _partitionConsumptionRateLimit =
+        partitionRate != null ? Double.parseDouble(partitionRate) : 
CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED;
 
-    String rate = 
streamConfigMap.get(StreamConfigProperties.TOPIC_CONSUMPTION_RATE_LIMIT);
-    _topicConsumptionRateLimit = rate != null ? Double.parseDouble(rate) : 
CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED;
+    String topicRate = 
streamConfigMap.get(StreamConfigProperties.TOPIC_CONSUMPTION_RATE_LIMIT);
+    _topicConsumptionRateLimit =
+        topicRate != null ? Double.parseDouble(topicRate) : 
CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED;
 
     _enableOffsetAutoReset = 
Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET));
     _offsetAutoResetOffsetThreshold = 
parseOffsetAutoResetOffsetThreshold(streamConfigMap);
@@ -413,13 +413,14 @@ public class StreamConfig {
     return _flushAutotuneInitialRows;
   }
 
-  public String getGroupId() {
-    return _groupId;
+  /// Returns the partition level consumption rate limit. Non-positive value 
means consumption is not throttled.
+  public double getPartitionConsumptionRateLimit() {
+    return _partitionConsumptionRateLimit;
   }
 
-  public Optional<Double> getTopicConsumptionRateLimit() {
-    return _topicConsumptionRateLimit == CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED 
? Optional.empty()
-        : Optional.of(_topicConsumptionRateLimit);
+  /// Returns the topic level consumption rate limit. Non-positive value means 
consumption is not throttled.
+  public double getTopicConsumptionRateLimit() {
+    return _topicConsumptionRateLimit;
   }
 
   public boolean isEnableOffsetAutoReset() {
@@ -448,22 +449,32 @@ public class StreamConfig {
 
   @Override
   public String toString() {
-    return "StreamConfig{" + "_type='" + _type + '\'' + ", _topicName='" + 
_topicName + '\'' + ", _tableNameWithType='"
-        + _tableNameWithType + '\'' + ", _consumerFactoryClassName='" + 
_consumerFactoryClassName + '\''
-        + ", _decoderClass='" + _decoderClass + '\'' + ", _decoderProperties=" 
+ _decoderProperties
-        + ", _connectionTimeoutMillis=" + _connectionTimeoutMillis + ", 
_fetchTimeoutMillis=" + _fetchTimeoutMillis
-        + ", _idleTimeoutMillis=" + _idleTimeoutMillis + ", 
_flushThresholdRows=" + _flushThresholdRows
-        + ", _flushThresholdSegmentRows=" + _flushThresholdSegmentRows + ", 
_flushThresholdTimeMillis="
-        + _flushThresholdTimeMillis + ", _flushThresholdSegmentSizeBytes=" + 
_flushThresholdSegmentSizeBytes
+    return "StreamConfig{"
+        + "_type='" + _type + '\''
+        + ", _topicName='" + _topicName + '\''
+        + ", _tableNameWithType='" + _tableNameWithType + '\''
+        + ", _consumerFactoryClassName='" + _consumerFactoryClassName + '\''
+        + ", _decoderClass='" + _decoderClass + '\''
+        + ", _decoderProperties=" + _decoderProperties
+        + ", _connectionTimeoutMillis=" + _connectionTimeoutMillis
+        + ", _fetchTimeoutMillis=" + _fetchTimeoutMillis
+        + ", _idleTimeoutMillis=" + _idleTimeoutMillis
+        + ", _flushThresholdRows=" + _flushThresholdRows
+        + ", _flushThresholdSegmentRows=" + _flushThresholdSegmentRows
+        + ", _flushThresholdTimeMillis=" + _flushThresholdTimeMillis
+        + ", _flushThresholdSegmentSizeBytes=" + 
_flushThresholdSegmentSizeBytes
         + ", _flushThresholdVarianceFraction=" + 
_flushThresholdVarianceFraction
-        + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ", 
_groupId='" + _groupId + '\''
+        + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows
+        + ", _partitionConsumptionRateLimit=" + _partitionConsumptionRateLimit
         + ", _topicConsumptionRateLimit=" + _topicConsumptionRateLimit
         + ", _enableOffsetAutoReset=" + _enableOffsetAutoReset
-        + ", _offsetAutoResetOffsetThreshold" + _offsetAutoResetOffsetThreshold
-        + ", _offSetAutoResetTimeSecThreshold" + 
_offsetAutoResetTimeSecThreshold
+        + ", _offsetAutoResetOffsetThreshold=" + 
_offsetAutoResetOffsetThreshold
+        + ", _offsetAutoResetTimeSecThreshold=" + 
_offsetAutoResetTimeSecThreshold
         + ", _backfillTopic=" + _backfillTopic
         + ", _streamConfigMap=" + _streamConfigMap
-        + ", _offsetCriteria=" + _offsetCriteria + ", 
_serverUploadToDeepStore=" + _serverUploadToDeepStore + '}';
+        + ", _offsetCriteria=" + _offsetCriteria
+        + ", _serverUploadToDeepStore=" + _serverUploadToDeepStore
+        + '}';
   }
 
   @Override
@@ -475,23 +486,30 @@ public class StreamConfig {
       return false;
     }
     StreamConfig that = (StreamConfig) o;
-    return _connectionTimeoutMillis == that._connectionTimeoutMillis && 
_fetchTimeoutMillis == that._fetchTimeoutMillis
-        && _idleTimeoutMillis == that._idleTimeoutMillis && 
_flushThresholdRows == that._flushThresholdRows
+    return Objects.equals(_type, that._type)
+        && Objects.equals(_topicName, that._topicName)
+        && Objects.equals(_tableNameWithType, that._tableNameWithType)
+        && Objects.equals(_consumerFactoryClassName, 
that._consumerFactoryClassName)
+        && Objects.equals(_decoderClass, that._decoderClass)
+        && Objects.equals(_decoderProperties, that._decoderProperties)
+        && _connectionTimeoutMillis == that._connectionTimeoutMillis
+        && _fetchTimeoutMillis == that._fetchTimeoutMillis
+        && _idleTimeoutMillis == that._idleTimeoutMillis
+        && _flushThresholdRows == that._flushThresholdRows
         && _flushThresholdSegmentRows == that._flushThresholdSegmentRows
         && _flushThresholdTimeMillis == that._flushThresholdTimeMillis
         && _flushThresholdSegmentSizeBytes == 
that._flushThresholdSegmentSizeBytes
+        && Double.compare(_flushThresholdVarianceFraction, 
that._flushThresholdVarianceFraction) == 0
         && _flushAutotuneInitialRows == that._flushAutotuneInitialRows
+        && Double.compare(_partitionConsumptionRateLimit, 
that._partitionConsumptionRateLimit) == 0
         && Double.compare(_topicConsumptionRateLimit, 
that._topicConsumptionRateLimit) == 0
-        && Objects.equals(_serverUploadToDeepStore, 
that._serverUploadToDeepStore) && Objects.equals(_type, that._type)
-        && Objects.equals(_topicName, that._topicName) && 
Objects.equals(_tableNameWithType, that._tableNameWithType)
-        && Objects.equals(_consumerFactoryClassName, 
that._consumerFactoryClassName) && Objects.equals(_decoderClass,
-        that._decoderClass) && Objects.equals(_decoderProperties, 
that._decoderProperties) && Objects.equals(_groupId,
-        that._groupId) && Objects.equals(_streamConfigMap, 
that._streamConfigMap) && Objects.equals(_offsetCriteria,
-        that._offsetCriteria) && 
Objects.equals(_flushThresholdVarianceFraction, 
that._flushThresholdVarianceFraction)
         && _enableOffsetAutoReset == that._enableOffsetAutoReset
         && _offsetAutoResetOffsetThreshold == 
that._offsetAutoResetOffsetThreshold
         && _offsetAutoResetTimeSecThreshold == 
that._offsetAutoResetTimeSecThreshold
-        && Objects.equals(_backfillTopic, that._backfillTopic);
+        && Objects.equals(_backfillTopic, that._backfillTopic)
+        && Objects.equals(_streamConfigMap, that._streamConfigMap)
+        && Objects.equals(_offsetCriteria, that._offsetCriteria)
+        && Objects.equals(_serverUploadToDeepStore, 
that._serverUploadToDeepStore);
   }
 
   @Override
@@ -499,8 +517,8 @@ public class StreamConfig {
     return Objects.hash(_type, _topicName, _tableNameWithType, 
_consumerFactoryClassName, _decoderClass,
         _decoderProperties, _connectionTimeoutMillis, _fetchTimeoutMillis, 
_idleTimeoutMillis, _flushThresholdRows,
         _flushThresholdSegmentRows, _flushThresholdTimeMillis, 
_flushThresholdSegmentSizeBytes,
-        _flushAutotuneInitialRows, _groupId, _topicConsumptionRateLimit, 
_streamConfigMap, _offsetCriteria,
-        _serverUploadToDeepStore, _flushThresholdVarianceFraction, 
_offsetAutoResetOffsetThreshold,
-        _enableOffsetAutoReset, _offsetAutoResetTimeSecThreshold, 
_backfillTopic);
+        _flushThresholdVarianceFraction, _flushAutotuneInitialRows, 
_partitionConsumptionRateLimit,
+        _topicConsumptionRateLimit, _enableOffsetAutoReset, 
_offsetAutoResetOffsetThreshold,
+        _offsetAutoResetTimeSecThreshold, _backfillTopic, _streamConfigMap, 
_offsetCriteria, _serverUploadToDeepStore);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index aa00ceb8870..aefa0e9093d 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -45,8 +45,7 @@ public class StreamConfigProperties {
   public static final String STREAM_IDLE_TIMEOUT_MILLIS = 
"idle.timeout.millis";
   public static final String STREAM_DECODER_CLASS = "decoder.class.name";
   public static final String DECODER_PROPS_PREFIX = "decoder.prop";
-  public static final String GROUP_ID = "hlc.group.id";
-  public static final String PARTITION_MSG_OFFSET_FACTORY_CLASS = 
"partition.offset.factory.class.name";
+  public static final String PARTITION_CONSUMPTION_RATE_LIMIT = 
"partition.consumption.rate.limit";
   public static final String TOPIC_CONSUMPTION_RATE_LIMIT = 
"topic.consumption.rate.limit";
   public static final String METADATA_POPULATE = "metadata.populate";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to