This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 42c6424  Make realtime threshold property names less ambiguous (#5953)
42c6424 is described below

commit 42c6424a56f5fd1c55b8da418fab703e2b259fdb
Author: Neha Pawar <[email protected]>
AuthorDate: Tue Sep 1 10:44:51 2020 -0700

    Make realtime threshold property names less ambiguous (#5953)
---
 .../SegmentSizeBasedFlushThresholdUpdater.java     |  2 +-
 .../segment/FlushThresholdUpdaterTest.java         |  9 ++-
 .../core/realtime/stream/StreamConfigTest.java     | 49 ++++++++++++----
 .../spi/stream/PartitionLevelStreamConfig.java     |  2 +-
 .../org/apache/pinot/spi/stream/StreamConfig.java  | 68 +++++++++++++---------
 .../pinot/spi/stream/StreamConfigProperties.java   | 13 +++--
 6 files changed, 94 insertions(+), 49 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 2b601f3..2e73806 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -56,7 +56,7 @@ public class SegmentSizeBasedFlushThresholdUpdater implements 
FlushThresholdUpda
   public synchronized void updateFlushThreshold(PartitionLevelStreamConfig 
streamConfig,
       LLCRealtimeSegmentZKMetadata newSegmentZKMetadata, 
CommittingSegmentDescriptor committingSegmentDescriptor,
       @Nullable LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata, int 
maxNumPartitionsPerInstance) {
-    final long desiredSegmentSizeBytes = 
streamConfig.getFlushSegmentDesiredSizeBytes();
+    final long desiredSegmentSizeBytes = 
streamConfig.getFlushThresholdSegmentSizeBytes();
     final long timeThresholdMillis = 
streamConfig.getFlushThresholdTimeMillis();
     final int autotuneInitialRows = streamConfig.getFlushAutotuneInitialRows();
     final long optimalSegmentSizeBytesMin = desiredSegmentSizeBytes / 2;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 98c02b1..396e4bd 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -24,7 +24,6 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.Test;
 
@@ -101,14 +100,14 @@ public class FlushThresholdUpdaterTest {
     PartitionLevelStreamConfig streamConfig = 
mock(PartitionLevelStreamConfig.class);
     when(streamConfig.getTableNameWithType()).thenReturn(REALTIME_TABLE_NAME);
     when(streamConfig.getFlushThresholdRows()).thenReturn(0);
-    
when(streamConfig.getFlushSegmentDesiredSizeBytes()).thenReturn(flushSegmentDesiredSizeBytes);
+    
when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(flushSegmentDesiredSizeBytes);
     
when(streamConfig.getFlushThresholdTimeMillis()).thenReturn(flushThresholdTimeMillis);
     
when(streamConfig.getFlushAutotuneInitialRows()).thenReturn(flushAutotuneInitialRows);
     return streamConfig;
   }
 
   private PartitionLevelStreamConfig mockDefaultAutotuneStreamConfig() {
-    return 
mockAutotuneStreamConfig(StreamConfig.DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES,
+    return 
mockAutotuneStreamConfig(StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES,
         StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS, 
StreamConfig.DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS);
   }
 
@@ -121,7 +120,7 @@ public class FlushThresholdUpdaterTest {
   @Test
   public void testSegmentSizeBasedFlushThreshold() {
     PartitionLevelStreamConfig streamConfig = 
mockDefaultAutotuneStreamConfig();
-    long desiredSegmentSizeBytes = 
streamConfig.getFlushSegmentDesiredSizeBytes();
+    long desiredSegmentSizeBytes = 
streamConfig.getFlushThresholdSegmentSizeBytes();
     long segmentSizeLowerLimit = (long) (desiredSegmentSizeBytes * 0.99);
     long segmentSizeHigherLimit = (long) (desiredSegmentSizeBytes * 1.01);
 
@@ -307,7 +306,7 @@ public class FlushThresholdUpdaterTest {
     SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new 
SegmentSizeBasedFlushThresholdUpdater();
 
     // Use customized stream config
-    long flushSegmentDesiredSizeBytes = 
StreamConfig.DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES / 2;
+    long flushSegmentDesiredSizeBytes = 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES / 2;
     long flushThresholdTimeMillis = 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS / 2;
     int flushAutotuneInitialRows = 
StreamConfig.DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS / 2;
     PartitionLevelStreamConfig streamConfig =
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index af09f30..e5855dd 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -193,8 +193,8 @@ public class StreamConfigTest {
     Assert.assertEquals(streamConfig.getFetchTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
     Assert.assertEquals(streamConfig.getFlushThresholdRows(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
     Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
-    Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(),
-        StreamConfig.DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES);
+    Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
+        StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
 
     consumerType = "lowLevel,highLevel";
     String offsetCriteria = "smallest";
@@ -221,7 +221,7 @@ public class StreamConfigTest {
         fetchTimeout);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
flushThresholdRows);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
flushThresholdTime);
-    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE, 
flushSegmentSize);
+    
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE,
 flushSegmentSize);
 
     streamConfig = new StreamConfig(tableName, streamConfigMap);
     Assert.assertEquals(streamConfig.getType(), streamType);
@@ -238,7 +238,7 @@ public class StreamConfigTest {
     Assert.assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRows));
     Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(),
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
-    Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(), 
DataSizeUtils.toBytes(flushSegmentSize));
+    Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), 
DataSizeUtils.toBytes(flushSegmentSize));
 
     // Backward compatibility check for flushThresholdTime
     flushThresholdTime = "18000000";
@@ -249,6 +249,18 @@ public class StreamConfigTest {
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
flushThresholdTime);
     streamConfig = new StreamConfig(tableName, streamConfigMap);
     Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
+
+    // Backward compatibility check for flush threshold rows
+    
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS,
 "10000");
+    streamConfig = new StreamConfig(tableName, streamConfigMap);
+    Assert.assertEquals(streamConfig.getFlushThresholdRows(), 10000);
+
+    // Backward compatibility check for flush threshold segment size
+    
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE);
+    
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE,
 "10M");
+    streamConfig = new StreamConfig(tableName, streamConfigMap);
+    Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), 
DataSizeUtils.toBytes("10M"));
   }
 
   /**
@@ -313,25 +325,38 @@ public class StreamConfigTest {
     Assert
         .assertEquals(streamConfig.getConnectionTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
 
-    // Invalid flush threshold rows
+    // Invalid flush threshold rows - deprecated property
     streamConfigMap.remove(StreamConfigProperties
         .constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS));
+    
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS,
 "rows");
+    streamConfig = new StreamConfig(tableName, streamConfigMap);
+    Assert.assertEquals(streamConfig.getFlushThresholdRows(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
+
+    // Invalid flush threshold rows - new property
+    
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
"rows");
     streamConfig = new StreamConfig(tableName, streamConfigMap);
     Assert.assertEquals(streamConfig.getFlushThresholdRows(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
 
     // Invalid flush threshold time
-    
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
"time");
     streamConfig = new StreamConfig(tableName, streamConfigMap);
     Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
 
-    // Invalid flush segment size
+    // Invalid flush segment size - deprecated property
     
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME);
-    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE, 
"size");
+    
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE,
 "size");
+    streamConfig = new StreamConfig(tableName, streamConfigMap);
+    Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
+        StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
+
+    // Invalid flush segment size - new property
+    
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE);
+    
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE,
 "size");
     streamConfig = new StreamConfig(tableName, streamConfigMap);
-    Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(),
-        StreamConfig.DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES);
+    Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
+        StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
   }
 
   /**
@@ -380,7 +405,7 @@ public class StreamConfigTest {
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
 
     // llc overrides provided, but base values will be picked in base 
StreamConfigs
-    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS + 
StreamConfigProperties.LLC_SUFFIX,
+    
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS
 + StreamConfigProperties.LLC_SUFFIX,
         flushThresholdRowsLLC);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME + 
StreamConfigProperties.LLC_SUFFIX,
         flushThresholdTimeLLC);
@@ -403,7 +428,7 @@ public class StreamConfigTest {
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTimeLLC));
 
     // PartitionLevelStreamConfig should use base values if llc overrides not 
provided
-    streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS 
+ StreamConfigProperties.LLC_SUFFIX);
+    
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS
 + StreamConfigProperties.LLC_SUFFIX);
     streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME 
+ StreamConfigProperties.LLC_SUFFIX);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
flushThresholdRows);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
flushThresholdTime);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
index 0f6a383..e545b94 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
@@ -39,7 +39,7 @@ public class PartitionLevelStreamConfig extends StreamConfig {
   @Override
   protected int extractFlushThresholdRows(Map<String, String> streamConfigMap) 
{
     String flushThresholdRowsKey =
-        StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS + 
StreamConfigProperties.LLC_SUFFIX;
+        StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS + 
StreamConfigProperties.LLC_SUFFIX;
     String flushThresholdRowsStr = streamConfigMap.get(flushThresholdRowsKey);
     if (flushThresholdRowsStr != null) {
       try {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 817bcd5..d343203 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -46,7 +46,7 @@ public class StreamConfig {
 
   public static final int DEFAULT_FLUSH_THRESHOLD_ROWS = 5_000_000;
   public static final long DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS = 
TimeUnit.MILLISECONDS.convert(6, TimeUnit.HOURS);
-  public static final long DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES = 200 * 
1024 * 1024; // 200M
+  public static final long DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES = 200 * 
1024 * 1024; // 200M
   public static final int DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS = 100_000;
 
   public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
@@ -71,7 +71,7 @@ public class StreamConfig {
 
   private final int _flushThresholdRows;
   private final long _flushThresholdTimeMillis;
-  private final long _flushSegmentDesiredSizeBytes;
+  private final long _flushThresholdSegmentSizeBytes;
   private final int _flushAutotuneInitialRows; // initial num rows to use for 
SegmentSizeBasedFlushThresholdUpdater
 
   private final String _groupId;
@@ -163,22 +163,7 @@ public class StreamConfig {
 
     _flushThresholdRows = extractFlushThresholdRows(streamConfigMap);
     _flushThresholdTimeMillis = 
extractFlushThresholdTimeMillis(streamConfigMap);
-
-    long flushDesiredSize = -1;
-    String flushSegmentDesiredSizeValue = 
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE);
-    if (flushSegmentDesiredSizeValue != null) {
-      try {
-        flushDesiredSize = DataSizeUtils.toBytes(flushSegmentDesiredSizeValue);
-      } catch (Exception e) {
-        LOGGER.warn("Invalid config {}: {}, defaulting to: {}", 
StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE,
-            flushSegmentDesiredSizeValue, 
DataSizeUtils.fromBytes(DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES));
-      }
-    }
-    if (flushDesiredSize > 0) {
-      _flushSegmentDesiredSizeBytes = flushDesiredSize;
-    } else {
-      _flushSegmentDesiredSizeBytes = DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES;
-    }
+    _flushThresholdSegmentSizeBytes = 
extractFlushThresholdSegmentSize(streamConfigMap);
 
     int autotuneInitialRows = 0;
     String initialRowsValue = 
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS);
@@ -199,8 +184,39 @@ public class StreamConfig {
     _streamConfigMap.putAll(streamConfigMap);
   }
 
+  private long extractFlushThresholdSegmentSize(Map<String, String> 
streamConfigMap) {
+    long segmentSizeBytes = -1;
+    String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE;
+    String flushThresholdSegmentSizeStr = streamConfigMap.get(key);
+    if (flushThresholdSegmentSizeStr == null) {
+      // for backward compatibility with older property
+      key = StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE;
+      flushThresholdSegmentSizeStr = streamConfigMap.get(key);
+    }
+
+    if (flushThresholdSegmentSizeStr != null) {
+      try {
+        segmentSizeBytes = DataSizeUtils.toBytes(flushThresholdSegmentSizeStr);
+      } catch (Exception e) {
+        LOGGER.warn("Invalid config {}: {}, defaulting to: {}", key, 
flushThresholdSegmentSizeStr,
+            
DataSizeUtils.fromBytes(DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES));
+      }
+    }
+    if (segmentSizeBytes > 0) {
+      return segmentSizeBytes;
+    } else {
+      return DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES;
+    }
+  }
+
   protected int extractFlushThresholdRows(Map<String, String> streamConfigMap) 
{
-    String flushThresholdRowsStr = 
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS;
+    String flushThresholdRowsStr = streamConfigMap.get(key);
+    if (flushThresholdRowsStr == null) {
+      // for backward compatibility with older property
+      key = StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS;
+      flushThresholdRowsStr = streamConfigMap.get(key);
+    }
     if (flushThresholdRowsStr != null) {
       try {
         int flushThresholdRows = Integer.parseInt(flushThresholdRowsStr);
@@ -208,8 +224,8 @@ public class StreamConfig {
         Preconditions.checkState(flushThresholdRows >= 0);
         return flushThresholdRows;
       } catch (Exception e) {
-        LOGGER.warn("Invalid config {}: {}, defaulting to: {}", 
StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
-            flushThresholdRowsStr, DEFAULT_FLUSH_THRESHOLD_ROWS);
+        LOGGER
+            .warn("Invalid config {}: {}, defaulting to: {}", key, 
flushThresholdRowsStr, DEFAULT_FLUSH_THRESHOLD_ROWS);
         return DEFAULT_FLUSH_THRESHOLD_ROWS;
       }
     } else {
@@ -289,8 +305,8 @@ public class StreamConfig {
     return _flushThresholdTimeMillis;
   }
 
-  public long getFlushSegmentDesiredSizeBytes() {
-    return _flushSegmentDesiredSizeBytes;
+  public long getFlushThresholdSegmentSizeBytes() {
+    return _flushThresholdSegmentSizeBytes;
   }
 
   public int getFlushAutotuneInitialRows() {
@@ -315,7 +331,7 @@ public class StreamConfig {
         + _consumerTypes + ", _consumerFactoryClassName='" + 
_consumerFactoryClassName + '\'' + ", _offsetCriteria='"
         + _offsetCriteria + '\'' + ", _connectionTimeoutMillis=" + 
_connectionTimeoutMillis + ", _fetchTimeoutMillis="
         + _fetchTimeoutMillis + ", _flushThresholdRows=" + _flushThresholdRows 
+ ", _flushThresholdTimeMillis="
-        + _flushThresholdTimeMillis + ", _flushSegmentDesiredSizeBytes=" + 
_flushSegmentDesiredSizeBytes
+        + _flushThresholdTimeMillis + ", _flushSegmentDesiredSizeBytes=" + 
_flushThresholdSegmentSizeBytes
         + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ", 
_decoderClass='" + _decoderClass + '\''
         + ", _decoderProperties=" + _decoderProperties + ", _groupId='" + 
_groupId + ", _tableNameWithType='"
         + _tableNameWithType + '}';
@@ -337,7 +353,7 @@ public class StreamConfig {
         .isEqual(_fetchTimeoutMillis, that._fetchTimeoutMillis) && 
EqualityUtils
         .isEqual(_flushThresholdRows, that._flushThresholdRows) && 
EqualityUtils
         .isEqual(_flushThresholdTimeMillis, that._flushThresholdTimeMillis) && 
EqualityUtils
-        .isEqual(_flushSegmentDesiredSizeBytes, 
that._flushSegmentDesiredSizeBytes) && EqualityUtils
+        .isEqual(_flushThresholdSegmentSizeBytes, 
that._flushThresholdSegmentSizeBytes) && EqualityUtils
         .isEqual(_flushAutotuneInitialRows, that._flushAutotuneInitialRows) && 
EqualityUtils.isEqual(_type, that._type)
         && EqualityUtils.isEqual(_topicName, that._topicName) && EqualityUtils
         .isEqual(_consumerTypes, that._consumerTypes) && EqualityUtils
@@ -359,7 +375,7 @@ public class StreamConfig {
     result = EqualityUtils.hashCodeOf(result, _fetchTimeoutMillis);
     result = EqualityUtils.hashCodeOf(result, _flushThresholdRows);
     result = EqualityUtils.hashCodeOf(result, _flushThresholdTimeMillis);
-    result = EqualityUtils.hashCodeOf(result, _flushSegmentDesiredSizeBytes);
+    result = EqualityUtils.hashCodeOf(result, _flushThresholdSegmentSizeBytes);
     result = EqualityUtils.hashCodeOf(result, _flushAutotuneInitialRows);
     result = EqualityUtils.hashCodeOf(result, _decoderClass);
     result = EqualityUtils.hashCodeOf(result, _decoderProperties);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index 8284611..ffe92dc 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -51,6 +51,8 @@ public class StreamConfigProperties {
   public static final String SEGMENT_FLUSH_THRESHOLD_TIME = 
"realtime.segment.flush.threshold.time";
 
   /**
+   * @deprecated because the property key is confusing (says size but is 
actually rows). Use {@link StreamConfigProperties#SEGMENT_FLUSH_THRESHOLD_ROWS}
+   *
    * Row count flush threshold for realtime segments. This behaves in a 
similar way for HLC and LLC. For HLC,
    * since there is only one consumer per server, this size is used as the 
size of the consumption buffer and
    * determines after how many rows we flush to disk. For example, if this 
threshold is set to two million rows,
@@ -69,9 +71,12 @@ public class StreamConfigProperties {
    * the size of the completed segment is the desired size (see 
REALTIME_DESIRED_SEGMENT_SIZE), unless
    * REALTIME_SEGMENT_FLUSH_TIME is reached first)
    */
-  public static final String SEGMENT_FLUSH_THRESHOLD_ROWS = 
"realtime.segment.flush.threshold.size";
+  public static final String DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS = 
"realtime.segment.flush.threshold.size";
+  public static final String SEGMENT_FLUSH_THRESHOLD_ROWS = 
"realtime.segment.flush.threshold.rows";
 
-  /*
+  /**
+   * @deprecated because the property key is confusing (desired size is not 
indicative of segment size). Use {@link 
StreamConfigProperties#SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE}
+   *
    * The desired size of a completed realtime segment.
    * This config is used only if REALTIME_SEGMENT_FLUSH_SIZE is set
    * to 0. Default value of REALTIME_SEGMENT_FLUSH_SIZE is "200M". Values are 
parsed using DataSize class.
@@ -88,13 +93,13 @@ public class StreamConfigProperties {
    *
    * Not included here is any heap memory used (currently inverted index uses 
heap memory for consuming partitions).
    */
-  public static final String SEGMENT_FLUSH_DESIRED_SIZE = 
"realtime.segment.flush.desired.size";
+  public static final String DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE = 
"realtime.segment.flush.desired.size";
+  public static final String SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE = 
"realtime.segment.flush.threshold.segment.size";
 
   /**
    * The initial num rows to use for segment size auto tuning. By default 
100_000 is used.
    */
   public static final String SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS = 
"realtime.segment.flush.autotune.initialRows";
-
   // Time threshold that controller will wait for the segment to be built by 
the server
   public static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = 
"realtime.segment.commit.timeoutSeconds";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to