This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3174e91b96 12508: Feature add segment rows flush config (#12681)
3174e91b96 is described below

commit 3174e91b961224caca980f2b3f5c8556a02908b0
Author: Karthick Venkatesan <[email protected]>
AuthorDate: Fri Apr 5 06:40:10 2024 +0530

    12508: Feature add segment rows flush config (#12681)
---
 .../segment/FixedFlushThresholdUpdater.java        | 39 ++++++++++++++++++++++
 .../segment/FlushThresholdUpdateManager.java       | 20 ++++++++---
 .../Homepage/Operations/AddIngestionComponent.tsx  |  1 +
 .../Operations/AddRealTimeIngestionComponent.tsx   |  1 +
 .../Homepage/Operations/AddRealtimeTableOp.tsx     |  1 +
 .../segment/FlushThresholdUpdaterTest.java         | 32 ++++++++++--------
 .../core/realtime/stream/StreamConfigTest.java     |  6 ++++
 .../segment/local/utils/TableConfigUtils.java      | 17 ++++++++--
 .../org/apache/pinot/spi/stream/StreamConfig.java  | 36 ++++++++++++++++----
 .../pinot/spi/stream/StreamConfigProperties.java   |  6 ++++
 .../apache/pinot/spi/config/ConfigUtilsTest.java   |  1 +
 11 files changed, 133 insertions(+), 27 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FixedFlushThresholdUpdater.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FixedFlushThresholdUpdater.java
new file mode 100644
index 0000000000..1f6cea5fab
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FixedFlushThresholdUpdater.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime.segment;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public class FixedFlushThresholdUpdater implements FlushThresholdUpdater {
+  private final int _flushThreshold;
+
+  FixedFlushThresholdUpdater(int flushThreshold) {
+    _flushThreshold = flushThreshold;
+  }
+
+  @Override
+  public void updateFlushThreshold(StreamConfig streamConfig, 
SegmentZKMetadata newSegmentZKMetadata,
+      CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable 
SegmentZKMetadata committingSegmentZKMetadata,
+      int maxNumPartitionsPerInstance) {
+    newSegmentZKMetadata.setSizeThresholdToFlushSegment(_flushThreshold);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
index 2d9dca1f3e..0076434b66 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
@@ -32,9 +32,17 @@ public class FlushThresholdUpdateManager {
   /**
    * Check table config for flush size.
    *
-   * If flush size > 0, create a new DefaultFlushThresholdUpdater with given 
flush size.
-   * If flush size <= 0, create new SegmentSizeBasedFlushThresholdUpdater if 
not already created. Create only 1 per
-   * table because we want to maintain tuning information for the table in the 
updater.
+   * If flush rows > 0, create a new DefaultFlushThresholdUpdater with the 
given flush size.
+   * If flush segment rows > 0, create a new FixedFlushThresholdUpdater with 
the given flush size.
+   * If flush segment size > 0, create a new 
SegmentSizeBasedFlushThresholdUpdater if not already created. Create only 1
+   * per table because we want to maintain tuning information for the table in 
the updater.
+   *
+   * LEGACY BEHAVIOR:
+   * If flush rows = 0, use segment size based flush threshold.
+   * If none of the above are set, create a new DefaultFlushThresholdUpdater.
+   *
+   * DefaultFlushThresholdUpdater sets the actual segment flush threshold to 
be flush rows divided by max number of
+   * partitions consumed by a server; FixedFlushThresholdUpdater sets the 
actual segment flush threshold as is.
    */
   public FlushThresholdUpdater getFlushThresholdUpdater(StreamConfig 
streamConfig) {
     String realtimeTableName = streamConfig.getTableNameWithType();
@@ -44,7 +52,11 @@ public class FlushThresholdUpdateManager {
       _flushThresholdUpdaterMap.remove(realtimeTableName);
       return new DefaultFlushThresholdUpdater(flushThresholdRows);
     }
-
+    int flushThresholdSegmentRows = 
streamConfig.getFlushThresholdSegmentRows();
+    if (flushThresholdSegmentRows > 0) {
+      _flushThresholdUpdaterMap.remove(realtimeTableName);
+      return new FixedFlushThresholdUpdater(flushThresholdSegmentRows);
+    }
     // Legacy behavior: when flush threshold rows is explicitly set to 0, use 
segment size based flush threshold
     long flushThresholdSegmentSizeBytes = 
streamConfig.getFlushThresholdSegmentSizeBytes();
     if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) {
diff --git 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
index 282d840e8a..8a958e2314 100644
--- 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
+++ 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
@@ -92,6 +92,7 @@ export default function AddIngestionComponent({
             
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
             
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
             "realtime.segment.flush.threshold.rows": "0",
+            "realtime.segment.flush.threshold.segment.rows": "0",
             "realtime.segment.flush.threshold.time": "24h",
             "realtime.segment.flush.threshold.segment.size": "100M"
         }
diff --git 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
index af0f16ab78..287df11705 100644
--- 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
+++ 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
@@ -92,6 +92,7 @@ export default function AddRealTimeIngestionComponent({
             
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
             
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
             "realtime.segment.flush.threshold.rows": "0",
+            "realtime.segment.flush.threshold.segment.rows": "0",
             "realtime.segment.flush.threshold.time": "24h",
             "realtime.segment.flush.threshold.segment.size": "100M"
         }
diff --git 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
index fe44c36334..f71630fded 100644
--- 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
+++ 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
@@ -100,6 +100,7 @@ const defaultTableObj = {
       "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
       "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
       "realtime.segment.flush.threshold.rows": "0",
+      "realtime.segment.flush.threshold.segment.rows": "0",
       "realtime.segment.flush.threshold.time": "24h",
       "realtime.segment.flush.threshold.segment.size": "100M"
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index e7bfd92ec4..9ae04827b6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -53,34 +53,38 @@ public class FlushThresholdUpdaterTest {
   public void testFlushThresholdUpdateManager() {
     FlushThresholdUpdateManager flushThresholdUpdateManager = new 
FlushThresholdUpdateManager();
 
-    // Neither flush threshold rows nor segment size is set - 
DefaultFlushThresholdUpdater should be returned
+    // None of the flush threshold set - DefaultFlushThresholdUpdater should 
be returned
     FlushThresholdUpdater flushThresholdUpdater =
-        
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, -1));
+        
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, -1, 
-1));
     assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater);
     assertEquals(((DefaultFlushThresholdUpdater) 
flushThresholdUpdater).getTableFlushSize(),
         StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
 
     // Flush threshold rows larger than 0 - DefaultFlushThresholdUpdater 
should be returned
-    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(1234, 
-1));
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(1234, -1, 
-1));
     assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater);
     assertEquals(((DefaultFlushThresholdUpdater) 
flushThresholdUpdater).getTableFlushSize(), 1234);
 
+    // Flush threshold segment rows larger than 0 - FixedFlushThresholdUpdater 
should be returned
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, 1234, 
-1));
+    assertTrue(flushThresholdUpdater instanceof FixedFlushThresholdUpdater);
+
     // Flush threshold rows set to 0 - SegmentSizeBasedFlushThresholdUpdater 
should be returned
     FlushThresholdUpdater segmentBasedflushThresholdUpdater =
-        
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1));
+        
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1, 
-1));
     assertTrue(segmentBasedflushThresholdUpdater instanceof 
SegmentSizeBasedFlushThresholdUpdater);
 
     // Flush threshold segment size larger than 0 - 
SegmentSizeBasedFlushThresholdUpdater should be returned
-    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, 
1234));
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1, 
1234));
     assertSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater);
 
     // Flush threshold rows set larger than 0 - DefaultFlushThresholdUpdater 
should be returned
-    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(12345, 
-1));
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(12345, 
-1, -1));
     assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater);
     assertEquals(((DefaultFlushThresholdUpdater) 
flushThresholdUpdater).getTableFlushSize(), 12345);
 
     // Call again with flush threshold rows set to 0 - a different Object 
should be returned
-    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1));
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1, 
-1));
     assertTrue(flushThresholdUpdater instanceof 
SegmentSizeBasedFlushThresholdUpdater);
     assertNotSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater);
     segmentBasedflushThresholdUpdater = flushThresholdUpdater;
@@ -89,15 +93,17 @@ public class FlushThresholdUpdaterTest {
     
flushThresholdUpdateManager.clearFlushThresholdUpdater(REALTIME_TABLE_NAME);
 
     // Call again with flush threshold rows set to 0 - a different Object 
should be returned
-    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1));
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1, 
-1));
     assertTrue(flushThresholdUpdater instanceof 
SegmentSizeBasedFlushThresholdUpdater);
     assertNotSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater);
   }
 
-  private StreamConfig mockStreamConfig(int flushThresholdRows, long 
flushThresholdSegmentSize) {
+  private StreamConfig mockStreamConfig(int flushThresholdRows, int 
flushThresholdSegmentRows,
+      long flushThresholdSegmentSize) {
     StreamConfig streamConfig = mock(StreamConfig.class);
     when(streamConfig.getTableNameWithType()).thenReturn(REALTIME_TABLE_NAME);
     when(streamConfig.getFlushThresholdRows()).thenReturn(flushThresholdRows);
+    
when(streamConfig.getFlushThresholdSegmentRows()).thenReturn(flushThresholdSegmentRows);
     
when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(flushThresholdSegmentSize);
     return streamConfig;
   }
@@ -131,8 +137,8 @@ public class FlushThresholdUpdaterTest {
     long segmentSizeLowerLimit = (long) (desiredSegmentSizeBytes * 0.99);
     long segmentSizeHigherLimit = (long) (desiredSegmentSizeBytes * 1.01);
 
-    for (long[] segmentSizesMB : Arrays
-        .asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, 
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB)) {
+    for (long[] segmentSizesMB : 
Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, 
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
+        STEPS_SEGMENT_SIZES_MB)) {
       SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new 
SegmentSizeBasedFlushThresholdUpdater();
 
       // Start consumption
@@ -168,8 +174,8 @@ public class FlushThresholdUpdaterTest {
     long segmentSizeLowerLimit = (long) (desiredSegmentSizeBytes * 0.99);
     long segmentSizeHigherLimit = (long) (desiredSegmentSizeBytes * 1.01);
 
-    for (long[] segmentSizesMB : Arrays
-        .asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, 
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB)) {
+    for (long[] segmentSizesMB : 
Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, 
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
+        STEPS_SEGMENT_SIZES_MB)) {
       SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new 
SegmentSizeBasedFlushThresholdUpdater();
 
       // Start consumption
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index ce3c6e3e74..7b8e7050a2 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -146,6 +146,7 @@ public class StreamConfigTest {
     assertEquals(streamConfig.getFetchTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
     assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
     assertEquals(streamConfig.getFlushThresholdRows(), -1);
+    assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
     assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
 
     String offsetCriteria = "smallest";
@@ -183,6 +184,7 @@ public class StreamConfigTest {
     assertEquals(streamConfig.getFlushThresholdTimeMillis(),
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
     assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRows));
+    assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
     assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), 
DataSizeUtils.toBytes(flushSegmentSize));
 
     // Backward compatibility check for flushThresholdTime
@@ -329,6 +331,7 @@ public class StreamConfigTest {
     streamConfig = new StreamConfig(tableName, streamConfigMap);
     assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
     assertEquals(streamConfig.getFlushThresholdRows(), -1);
+    assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
     assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
 
     // Use regular values if provided
@@ -338,6 +341,7 @@ public class StreamConfigTest {
     assertEquals(streamConfig.getFlushThresholdTimeMillis(),
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
     assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRows));
+    assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
     assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
 
     // Use regular values if both regular and llc config exists
@@ -350,6 +354,7 @@ public class StreamConfigTest {
     assertEquals(streamConfig.getFlushThresholdTimeMillis(),
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
     assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRows));
+    assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
     assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
 
     // Use llc values if only llc config exists
@@ -359,6 +364,7 @@ public class StreamConfigTest {
     assertEquals(streamConfig.getFlushThresholdTimeMillis(),
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTimeLLC));
     assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRowsLLC));
+    assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
     assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index a713c81492..3681e7e317 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -588,10 +588,21 @@ public final class TableConfigUtils {
     Preconditions.checkState(streamConfig.getFlushThresholdTimeMillis() > 0, 
"Invalid flush threshold time: %s",
         streamConfig.getFlushThresholdTimeMillis());
     int flushThresholdRows = streamConfig.getFlushThresholdRows();
+    int flushThresholdSegmentRows = 
streamConfig.getFlushThresholdSegmentRows();
     long flushThresholdSegmentSizeBytes = 
streamConfig.getFlushThresholdSegmentSizeBytes();
-    Preconditions.checkState(!(flushThresholdRows > 0 && 
flushThresholdSegmentSizeBytes > 0),
-        "Flush threshold rows: %s and flush threshold segment size: %s cannot 
be both set", flushThresholdRows,
-        flushThresholdSegmentSizeBytes);
+    int numFlushThresholdSet = 0;
+    if (flushThresholdRows > 0) {
+      numFlushThresholdSet++;
+    }
+    if (flushThresholdSegmentRows > 0) {
+      numFlushThresholdSet++;
+    }
+    if (flushThresholdSegmentSizeBytes > 0) {
+      numFlushThresholdSet++;
+    }
+    Preconditions.checkState(numFlushThresholdSet <= 1,
+        "Only 1 of flush threshold (rows: %s, segment rows: %s, segment size: 
%s) can be set", flushThresholdRows,
+        flushThresholdSegmentRows, flushThresholdSegmentSizeBytes);
 
     // Validate decoder
     if 
(streamConfig.getDecoderClass().equals("org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder"))
 {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 32bea7aa83..2a23b6cde9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -66,6 +66,7 @@ public class StreamConfig {
   private final long _idleTimeoutMillis;
 
   private final int _flushThresholdRows;
+  private final int _flushThresholdSegmentRows;
   private final long _flushThresholdTimeMillis;
   private final long _flushThresholdSegmentSizeBytes;
   private final int _flushAutotuneInitialRows; // initial num rows to use for 
SegmentSizeBasedFlushThresholdUpdater
@@ -176,6 +177,7 @@ public class StreamConfig {
     _idleTimeoutMillis = idleTimeoutMillis;
 
     _flushThresholdRows = extractFlushThresholdRows(streamConfigMap);
+    _flushThresholdSegmentRows = 
extractFlushThresholdSegmentRows(streamConfigMap);
     _flushThresholdTimeMillis = 
extractFlushThresholdTimeMillis(streamConfigMap);
     _flushThresholdSegmentSizeBytes = 
extractFlushThresholdSegmentSize(streamConfigMap);
     _serverUploadToDeepStore = Boolean.parseBoolean(
@@ -264,6 +266,20 @@ public class StreamConfig {
     }
   }
 
+  protected int extractFlushThresholdSegmentRows(Map<String, String> 
streamConfigMap) {
+    String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_ROWS;
+    String flushThresholdSegmentRowsStr = streamConfigMap.get(key);
+    if (flushThresholdSegmentRowsStr != null) {
+      try {
+        return Integer.parseInt(flushThresholdSegmentRowsStr);
+      } catch (Exception e) {
+        throw new IllegalArgumentException("Invalid config " + key + ": " + 
flushThresholdSegmentRowsStr);
+      }
+    } else {
+      return -1;
+    }
+  }
+
   protected long extractFlushThresholdTimeMillis(Map<String, String> 
streamConfigMap) {
     String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME;
     String flushThresholdTimeStr = streamConfigMap.get(key);
@@ -342,6 +358,10 @@ public class StreamConfig {
     return _flushThresholdRows;
   }
 
+  public int getFlushThresholdSegmentRows() {
+    return _flushThresholdSegmentRows;
+  }
+
   public long getFlushThresholdTimeMillis() {
     return _flushThresholdTimeMillis;
   }
@@ -378,11 +398,11 @@ public class StreamConfig {
         + ", _decoderClass='" + _decoderClass + '\'' + ", _decoderProperties=" 
+ _decoderProperties
         + ", _connectionTimeoutMillis=" + _connectionTimeoutMillis + ", 
_fetchTimeoutMillis=" + _fetchTimeoutMillis
         + ", _idleTimeoutMillis=" + _idleTimeoutMillis + ", 
_flushThresholdRows=" + _flushThresholdRows
-        + ", _flushThresholdTimeMillis=" + _flushThresholdTimeMillis + ", 
_flushThresholdSegmentSizeBytes="
-        + _flushThresholdSegmentSizeBytes + ", _flushAutotuneInitialRows=" + 
_flushAutotuneInitialRows + ", _groupId='"
-        + _groupId + '\'' + ", _topicConsumptionRateLimit=" + 
_topicConsumptionRateLimit + ", _streamConfigMap="
-        + _streamConfigMap + ", _offsetCriteria=" + _offsetCriteria + ", 
_serverUploadToDeepStore="
-        + _serverUploadToDeepStore + '}';
+        + ", _flushThresholdSegmentRows=" + _flushThresholdSegmentRows + ", 
_flushThresholdTimeMillis="
+        + _flushThresholdTimeMillis + ", _flushThresholdSegmentSizeBytes=" + 
_flushThresholdSegmentSizeBytes
+        + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ", 
_groupId='" + _groupId + '\''
+        + ", _topicConsumptionRateLimit=" + _topicConsumptionRateLimit + ", 
_streamConfigMap=" + _streamConfigMap
+        + ", _offsetCriteria=" + _offsetCriteria + ", 
_serverUploadToDeepStore=" + _serverUploadToDeepStore + '}';
   }
 
   @Override
@@ -396,6 +416,7 @@ public class StreamConfig {
     StreamConfig that = (StreamConfig) o;
     return _connectionTimeoutMillis == that._connectionTimeoutMillis && 
_fetchTimeoutMillis == that._fetchTimeoutMillis
         && _idleTimeoutMillis == that._idleTimeoutMillis && 
_flushThresholdRows == that._flushThresholdRows
+        && _flushThresholdSegmentRows == that._flushThresholdSegmentRows
         && _flushThresholdTimeMillis == that._flushThresholdTimeMillis
         && _flushThresholdSegmentSizeBytes == 
that._flushThresholdSegmentSizeBytes
         && _flushAutotuneInitialRows == that._flushAutotuneInitialRows
@@ -412,7 +433,8 @@ public class StreamConfig {
   public int hashCode() {
     return Objects.hash(_type, _topicName, _tableNameWithType, 
_consumerFactoryClassName, _decoderClass,
         _decoderProperties, _connectionTimeoutMillis, _fetchTimeoutMillis, 
_idleTimeoutMillis, _flushThresholdRows,
-        _flushThresholdTimeMillis, _flushThresholdSegmentSizeBytes, 
_flushAutotuneInitialRows, _groupId,
-        _topicConsumptionRateLimit, _streamConfigMap, _offsetCriteria, 
_serverUploadToDeepStore);
+        _flushThresholdSegmentRows, _flushThresholdTimeMillis, 
_flushThresholdSegmentSizeBytes,
+        _flushAutotuneInitialRows, _groupId, _topicConsumptionRateLimit, 
_streamConfigMap, _offsetCriteria,
+        _serverUploadToDeepStore);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index df8dc87426..d78936c05b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -82,6 +82,12 @@ public class StreamConfigProperties {
   public static final String DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS = 
"realtime.segment.flush.threshold.size";
   public static final String SEGMENT_FLUSH_THRESHOLD_ROWS = 
"realtime.segment.flush.threshold.rows";
 
+  /**
+   * Config is similar to {@link 
StreamConfigProperties#SEGMENT_FLUSH_THRESHOLD_ROWS} but independent of
+   * partition count. This is useful when we want to flush segment exactly 
based on number of rows in a segment
+   */
+  public static final String SEGMENT_FLUSH_THRESHOLD_SEGMENT_ROWS = 
"realtime.segment.flush.threshold.segment.rows";
+
   /**
    * @deprecated because the property key is confusing (desired size is not 
indicative of segment size).
    * Use {@link StreamConfigProperties#SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE}
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
index eeee659453..07028659fa 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
@@ -109,6 +109,7 @@ public class ConfigUtilsTest {
     assertEquals(streamConfig.getFetchTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
     assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
     assertEquals(streamConfig.getFlushThresholdRows(), -1);
+    assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
     assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to