This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3174e91b96 12508: Feature add segment rows flush config (#12681)
3174e91b96 is described below
commit 3174e91b961224caca980f2b3f5c8556a02908b0
Author: Karthick Venkatesan <[email protected]>
AuthorDate: Fri Apr 5 06:40:10 2024 +0530
12508: Feature add segment rows flush config (#12681)
---
.../segment/FixedFlushThresholdUpdater.java | 39 ++++++++++++++++++++++
.../segment/FlushThresholdUpdateManager.java | 20 ++++++++---
.../Homepage/Operations/AddIngestionComponent.tsx | 1 +
.../Operations/AddRealTimeIngestionComponent.tsx | 1 +
.../Homepage/Operations/AddRealtimeTableOp.tsx | 1 +
.../segment/FlushThresholdUpdaterTest.java | 32 ++++++++++--------
.../core/realtime/stream/StreamConfigTest.java | 6 ++++
.../segment/local/utils/TableConfigUtils.java | 17 ++++++++--
.../org/apache/pinot/spi/stream/StreamConfig.java | 36 ++++++++++++++++----
.../pinot/spi/stream/StreamConfigProperties.java | 6 ++++
.../apache/pinot/spi/config/ConfigUtilsTest.java | 1 +
11 files changed, 133 insertions(+), 27 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FixedFlushThresholdUpdater.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FixedFlushThresholdUpdater.java
new file mode 100644
index 0000000000..1f6cea5fab
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FixedFlushThresholdUpdater.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime.segment;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public class FixedFlushThresholdUpdater implements FlushThresholdUpdater {
+ private final int _flushThreshold;
+
+ FixedFlushThresholdUpdater(int flushThreshold) {
+ _flushThreshold = flushThreshold;
+ }
+
+ @Override
+ public void updateFlushThreshold(StreamConfig streamConfig,
SegmentZKMetadata newSegmentZKMetadata,
+ CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable
SegmentZKMetadata committingSegmentZKMetadata,
+ int maxNumPartitionsPerInstance) {
+ newSegmentZKMetadata.setSizeThresholdToFlushSegment(_flushThreshold);
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
index 2d9dca1f3e..0076434b66 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
@@ -32,9 +32,17 @@ public class FlushThresholdUpdateManager {
/**
* Check table config for flush size.
*
- * If flush size > 0, create a new DefaultFlushThresholdUpdater with given
flush size.
- * If flush size <= 0, create new SegmentSizeBasedFlushThresholdUpdater if
not already created. Create only 1 per
- * table because we want to maintain tuning information for the table in the
updater.
+ * If flush rows > 0, create a new DefaultFlushThresholdUpdater with the
given flush size.
+ * If flush segment rows > 0, create a new FixedFlushThresholdUpdater with
the given flush size.
+ * If flush segment size > 0, create a new
SegmentSizeBasedFlushThresholdUpdater if not already created. Create only 1
+ * per table because we want to maintain tuning information for the table in
the updater.
+ *
+ * LEGACY BEHAVIOR:
+ * If flush rows = 0, use segment size based flush threshold.
+ * If none of the above are set, create a new DefaultFlushThresholdUpdater.
+ *
+ * DefaultFlushThresholdUpdater sets the actual segment flush threshold to
be flush rows divided by max number of
+ * partitions consumed by a server; FixedFlushThresholdUpdater sets the
actual segment flush threshold as is.
*/
public FlushThresholdUpdater getFlushThresholdUpdater(StreamConfig
streamConfig) {
String realtimeTableName = streamConfig.getTableNameWithType();
@@ -44,7 +52,11 @@ public class FlushThresholdUpdateManager {
_flushThresholdUpdaterMap.remove(realtimeTableName);
return new DefaultFlushThresholdUpdater(flushThresholdRows);
}
-
+ int flushThresholdSegmentRows =
streamConfig.getFlushThresholdSegmentRows();
+ if (flushThresholdSegmentRows > 0) {
+ _flushThresholdUpdaterMap.remove(realtimeTableName);
+ return new FixedFlushThresholdUpdater(flushThresholdSegmentRows);
+ }
// Legacy behavior: when flush threshold rows is explicitly set to 0, use
segment size based flush threshold
long flushThresholdSegmentSizeBytes =
streamConfig.getFlushThresholdSegmentSizeBytes();
if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) {
diff --git
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
index 282d840e8a..8a958e2314 100644
---
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
+++
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
@@ -92,6 +92,7 @@ export default function AddIngestionComponent({
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "0",
+ "realtime.segment.flush.threshold.segment.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "100M"
}
diff --git
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
index af0f16ab78..287df11705 100644
---
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
+++
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
@@ -92,6 +92,7 @@ export default function AddRealTimeIngestionComponent({
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "0",
+ "realtime.segment.flush.threshold.segment.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "100M"
}
diff --git
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
index fe44c36334..f71630fded 100644
---
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
+++
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
@@ -100,6 +100,7 @@ const defaultTableObj = {
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "0",
+ "realtime.segment.flush.threshold.segment.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "100M"
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index e7bfd92ec4..9ae04827b6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -53,34 +53,38 @@ public class FlushThresholdUpdaterTest {
public void testFlushThresholdUpdateManager() {
FlushThresholdUpdateManager flushThresholdUpdateManager = new
FlushThresholdUpdateManager();
- // Neither flush threshold rows nor segment size is set -
DefaultFlushThresholdUpdater should be returned
+ // None of the flush threshold set - DefaultFlushThresholdUpdater should
be returned
FlushThresholdUpdater flushThresholdUpdater =
-
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, -1));
+
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, -1,
-1));
assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater);
assertEquals(((DefaultFlushThresholdUpdater)
flushThresholdUpdater).getTableFlushSize(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
// Flush threshold rows larger than 0 - DefaultFlushThresholdUpdater
should be returned
- flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(1234,
-1));
+ flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(1234, -1,
-1));
assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater);
assertEquals(((DefaultFlushThresholdUpdater)
flushThresholdUpdater).getTableFlushSize(), 1234);
+ // Flush threshold segment rows larger than 0 - FixedFlushThresholdUpdater
should be returned
+ flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, 1234,
-1));
+ assertTrue(flushThresholdUpdater instanceof FixedFlushThresholdUpdater);
+
// Flush threshold rows set to 0 - SegmentSizeBasedFlushThresholdUpdater
should be returned
FlushThresholdUpdater segmentBasedflushThresholdUpdater =
-
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1));
+
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1,
-1));
assertTrue(segmentBasedflushThresholdUpdater instanceof
SegmentSizeBasedFlushThresholdUpdater);
// Flush threshold segment size larger than 0 -
SegmentSizeBasedFlushThresholdUpdater should be returned
- flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1,
1234));
+ flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1,
1234));
assertSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater);
// Flush threshold rows set larger than 0 - DefaultFlushThresholdUpdater
should be returned
- flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(12345,
-1));
+ flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(12345,
-1, -1));
assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater);
assertEquals(((DefaultFlushThresholdUpdater)
flushThresholdUpdater).getTableFlushSize(), 12345);
// Call again with flush threshold rows set to 0 - a different Object
should be returned
- flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1));
+ flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1,
-1));
assertTrue(flushThresholdUpdater instanceof
SegmentSizeBasedFlushThresholdUpdater);
assertNotSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater);
segmentBasedflushThresholdUpdater = flushThresholdUpdater;
@@ -89,15 +93,17 @@ public class FlushThresholdUpdaterTest {
flushThresholdUpdateManager.clearFlushThresholdUpdater(REALTIME_TABLE_NAME);
// Call again with flush threshold rows set to 0 - a different Object
should be returned
- flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1));
+ flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1,
-1));
assertTrue(flushThresholdUpdater instanceof
SegmentSizeBasedFlushThresholdUpdater);
assertNotSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater);
}
- private StreamConfig mockStreamConfig(int flushThresholdRows, long
flushThresholdSegmentSize) {
+ private StreamConfig mockStreamConfig(int flushThresholdRows, int
flushThresholdSegmentRows,
+ long flushThresholdSegmentSize) {
StreamConfig streamConfig = mock(StreamConfig.class);
when(streamConfig.getTableNameWithType()).thenReturn(REALTIME_TABLE_NAME);
when(streamConfig.getFlushThresholdRows()).thenReturn(flushThresholdRows);
+
when(streamConfig.getFlushThresholdSegmentRows()).thenReturn(flushThresholdSegmentRows);
when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(flushThresholdSegmentSize);
return streamConfig;
}
@@ -131,8 +137,8 @@ public class FlushThresholdUpdaterTest {
long segmentSizeLowerLimit = (long) (desiredSegmentSizeBytes * 0.99);
long segmentSizeHigherLimit = (long) (desiredSegmentSizeBytes * 1.01);
- for (long[] segmentSizesMB : Arrays
- .asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB,
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB)) {
+ for (long[] segmentSizesMB :
Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB,
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
+ STEPS_SEGMENT_SIZES_MB)) {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new
SegmentSizeBasedFlushThresholdUpdater();
// Start consumption
@@ -168,8 +174,8 @@ public class FlushThresholdUpdaterTest {
long segmentSizeLowerLimit = (long) (desiredSegmentSizeBytes * 0.99);
long segmentSizeHigherLimit = (long) (desiredSegmentSizeBytes * 1.01);
- for (long[] segmentSizesMB : Arrays
- .asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB,
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB)) {
+ for (long[] segmentSizesMB :
Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB,
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
+ STEPS_SEGMENT_SIZES_MB)) {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new
SegmentSizeBasedFlushThresholdUpdater();
// Start consumption
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index ce3c6e3e74..7b8e7050a2 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -146,6 +146,7 @@ public class StreamConfigTest {
assertEquals(streamConfig.getFetchTimeoutMillis(),
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
assertEquals(streamConfig.getFlushThresholdTimeMillis(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
assertEquals(streamConfig.getFlushThresholdRows(), -1);
+ assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
String offsetCriteria = "smallest";
@@ -183,6 +184,7 @@ public class StreamConfigTest {
assertEquals(streamConfig.getFlushThresholdTimeMillis(),
(long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
assertEquals(streamConfig.getFlushThresholdRows(),
Integer.parseInt(flushThresholdRows));
+ assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
DataSizeUtils.toBytes(flushSegmentSize));
// Backward compatibility check for flushThresholdTime
@@ -329,6 +331,7 @@ public class StreamConfigTest {
streamConfig = new StreamConfig(tableName, streamConfigMap);
assertEquals(streamConfig.getFlushThresholdTimeMillis(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
assertEquals(streamConfig.getFlushThresholdRows(), -1);
+ assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
// Use regular values if provided
@@ -338,6 +341,7 @@ public class StreamConfigTest {
assertEquals(streamConfig.getFlushThresholdTimeMillis(),
(long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
assertEquals(streamConfig.getFlushThresholdRows(),
Integer.parseInt(flushThresholdRows));
+ assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
// Use regular values if both regular and llc config exists
@@ -350,6 +354,7 @@ public class StreamConfigTest {
assertEquals(streamConfig.getFlushThresholdTimeMillis(),
(long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
assertEquals(streamConfig.getFlushThresholdRows(),
Integer.parseInt(flushThresholdRows));
+ assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
// Use llc values if only llc config exists
@@ -359,6 +364,7 @@ public class StreamConfigTest {
assertEquals(streamConfig.getFlushThresholdTimeMillis(),
(long) TimeUtils.convertPeriodToMillis(flushThresholdTimeLLC));
assertEquals(streamConfig.getFlushThresholdRows(),
Integer.parseInt(flushThresholdRowsLLC));
+ assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index a713c81492..3681e7e317 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -588,10 +588,21 @@ public final class TableConfigUtils {
Preconditions.checkState(streamConfig.getFlushThresholdTimeMillis() > 0,
"Invalid flush threshold time: %s",
streamConfig.getFlushThresholdTimeMillis());
int flushThresholdRows = streamConfig.getFlushThresholdRows();
+ int flushThresholdSegmentRows =
streamConfig.getFlushThresholdSegmentRows();
long flushThresholdSegmentSizeBytes =
streamConfig.getFlushThresholdSegmentSizeBytes();
- Preconditions.checkState(!(flushThresholdRows > 0 &&
flushThresholdSegmentSizeBytes > 0),
- "Flush threshold rows: %s and flush threshold segment size: %s cannot
be both set", flushThresholdRows,
- flushThresholdSegmentSizeBytes);
+ int numFlushThresholdSet = 0;
+ if (flushThresholdRows > 0) {
+ numFlushThresholdSet++;
+ }
+ if (flushThresholdSegmentRows > 0) {
+ numFlushThresholdSet++;
+ }
+ if (flushThresholdSegmentSizeBytes > 0) {
+ numFlushThresholdSet++;
+ }
+ Preconditions.checkState(numFlushThresholdSet <= 1,
+ "Only 1 of flush threshold (rows: %s, segment rows: %s, segment size:
%s) can be set", flushThresholdRows,
+ flushThresholdSegmentRows, flushThresholdSegmentSizeBytes);
// Validate decoder
if
(streamConfig.getDecoderClass().equals("org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder"))
{
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 32bea7aa83..2a23b6cde9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -66,6 +66,7 @@ public class StreamConfig {
private final long _idleTimeoutMillis;
private final int _flushThresholdRows;
+ private final int _flushThresholdSegmentRows;
private final long _flushThresholdTimeMillis;
private final long _flushThresholdSegmentSizeBytes;
private final int _flushAutotuneInitialRows; // initial num rows to use for
SegmentSizeBasedFlushThresholdUpdater
@@ -176,6 +177,7 @@ public class StreamConfig {
_idleTimeoutMillis = idleTimeoutMillis;
_flushThresholdRows = extractFlushThresholdRows(streamConfigMap);
+ _flushThresholdSegmentRows =
extractFlushThresholdSegmentRows(streamConfigMap);
_flushThresholdTimeMillis =
extractFlushThresholdTimeMillis(streamConfigMap);
_flushThresholdSegmentSizeBytes =
extractFlushThresholdSegmentSize(streamConfigMap);
_serverUploadToDeepStore = Boolean.parseBoolean(
@@ -264,6 +266,20 @@ public class StreamConfig {
}
}
+ protected int extractFlushThresholdSegmentRows(Map<String, String>
streamConfigMap) {
+ String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_ROWS;
+ String flushThresholdSegmentRowsStr = streamConfigMap.get(key);
+ if (flushThresholdSegmentRowsStr != null) {
+ try {
+ return Integer.parseInt(flushThresholdSegmentRowsStr);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid config " + key + ": " +
flushThresholdSegmentRowsStr);
+ }
+ } else {
+ return -1;
+ }
+ }
+
protected long extractFlushThresholdTimeMillis(Map<String, String>
streamConfigMap) {
String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME;
String flushThresholdTimeStr = streamConfigMap.get(key);
@@ -342,6 +358,10 @@ public class StreamConfig {
return _flushThresholdRows;
}
+ public int getFlushThresholdSegmentRows() {
+ return _flushThresholdSegmentRows;
+ }
+
public long getFlushThresholdTimeMillis() {
return _flushThresholdTimeMillis;
}
@@ -378,11 +398,11 @@ public class StreamConfig {
+ ", _decoderClass='" + _decoderClass + '\'' + ", _decoderProperties="
+ _decoderProperties
+ ", _connectionTimeoutMillis=" + _connectionTimeoutMillis + ",
_fetchTimeoutMillis=" + _fetchTimeoutMillis
+ ", _idleTimeoutMillis=" + _idleTimeoutMillis + ",
_flushThresholdRows=" + _flushThresholdRows
- + ", _flushThresholdTimeMillis=" + _flushThresholdTimeMillis + ",
_flushThresholdSegmentSizeBytes="
- + _flushThresholdSegmentSizeBytes + ", _flushAutotuneInitialRows=" +
_flushAutotuneInitialRows + ", _groupId='"
- + _groupId + '\'' + ", _topicConsumptionRateLimit=" +
_topicConsumptionRateLimit + ", _streamConfigMap="
- + _streamConfigMap + ", _offsetCriteria=" + _offsetCriteria + ",
_serverUploadToDeepStore="
- + _serverUploadToDeepStore + '}';
+ + ", _flushThresholdSegmentRows=" + _flushThresholdSegmentRows + ",
_flushThresholdTimeMillis="
+ + _flushThresholdTimeMillis + ", _flushThresholdSegmentSizeBytes=" +
_flushThresholdSegmentSizeBytes
+ + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ",
_groupId='" + _groupId + '\''
+ + ", _topicConsumptionRateLimit=" + _topicConsumptionRateLimit + ",
_streamConfigMap=" + _streamConfigMap
+ + ", _offsetCriteria=" + _offsetCriteria + ",
_serverUploadToDeepStore=" + _serverUploadToDeepStore + '}';
}
@Override
@@ -396,6 +416,7 @@ public class StreamConfig {
StreamConfig that = (StreamConfig) o;
return _connectionTimeoutMillis == that._connectionTimeoutMillis &&
_fetchTimeoutMillis == that._fetchTimeoutMillis
&& _idleTimeoutMillis == that._idleTimeoutMillis &&
_flushThresholdRows == that._flushThresholdRows
+ && _flushThresholdSegmentRows == that._flushThresholdSegmentRows
&& _flushThresholdTimeMillis == that._flushThresholdTimeMillis
&& _flushThresholdSegmentSizeBytes ==
that._flushThresholdSegmentSizeBytes
&& _flushAutotuneInitialRows == that._flushAutotuneInitialRows
@@ -412,7 +433,8 @@ public class StreamConfig {
public int hashCode() {
return Objects.hash(_type, _topicName, _tableNameWithType,
_consumerFactoryClassName, _decoderClass,
_decoderProperties, _connectionTimeoutMillis, _fetchTimeoutMillis,
_idleTimeoutMillis, _flushThresholdRows,
- _flushThresholdTimeMillis, _flushThresholdSegmentSizeBytes,
_flushAutotuneInitialRows, _groupId,
- _topicConsumptionRateLimit, _streamConfigMap, _offsetCriteria,
_serverUploadToDeepStore);
+ _flushThresholdSegmentRows, _flushThresholdTimeMillis,
_flushThresholdSegmentSizeBytes,
+ _flushAutotuneInitialRows, _groupId, _topicConsumptionRateLimit,
_streamConfigMap, _offsetCriteria,
+ _serverUploadToDeepStore);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index df8dc87426..d78936c05b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -82,6 +82,12 @@ public class StreamConfigProperties {
public static final String DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS =
"realtime.segment.flush.threshold.size";
public static final String SEGMENT_FLUSH_THRESHOLD_ROWS =
"realtime.segment.flush.threshold.rows";
+ /**
+ * Config is similar to {@link
StreamConfigProperties#SEGMENT_FLUSH_THRESHOLD_ROWS} but independent of
+ * partition count. This is useful when we want to flush segment exactly
based on number of rows in a segment
+ */
+ public static final String SEGMENT_FLUSH_THRESHOLD_SEGMENT_ROWS =
"realtime.segment.flush.threshold.segment.rows";
+
/**
* @deprecated because the property key is confusing (desired size is not
indicative of segment size).
* Use {@link StreamConfigProperties#SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
index eeee659453..07028659fa 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
@@ -109,6 +109,7 @@ public class ConfigUtilsTest {
assertEquals(streamConfig.getFetchTimeoutMillis(),
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
assertEquals(streamConfig.getFlushThresholdTimeMillis(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
assertEquals(streamConfig.getFlushThresholdRows(), -1);
+ assertEquals(streamConfig.getFlushThresholdSegmentRows(), -1);
assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]