This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 42c6424 Make realtime threshold property names less ambiguous (#5953)
42c6424 is described below
commit 42c6424a56f5fd1c55b8da418fab703e2b259fdb
Author: Neha Pawar <[email protected]>
AuthorDate: Tue Sep 1 10:44:51 2020 -0700
Make realtime threshold property names less ambiguous (#5953)
---
.../SegmentSizeBasedFlushThresholdUpdater.java | 2 +-
.../segment/FlushThresholdUpdaterTest.java | 9 ++-
.../core/realtime/stream/StreamConfigTest.java | 49 ++++++++++++----
.../spi/stream/PartitionLevelStreamConfig.java | 2 +-
.../org/apache/pinot/spi/stream/StreamConfig.java | 68 +++++++++++++---------
.../pinot/spi/stream/StreamConfigProperties.java | 13 +++--
6 files changed, 94 insertions(+), 49 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 2b601f3..2e73806 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -56,7 +56,7 @@ public class SegmentSizeBasedFlushThresholdUpdater implements
FlushThresholdUpda
public synchronized void updateFlushThreshold(PartitionLevelStreamConfig
streamConfig,
LLCRealtimeSegmentZKMetadata newSegmentZKMetadata,
CommittingSegmentDescriptor committingSegmentDescriptor,
@Nullable LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata, int
maxNumPartitionsPerInstance) {
- final long desiredSegmentSizeBytes =
streamConfig.getFlushSegmentDesiredSizeBytes();
+ final long desiredSegmentSizeBytes =
streamConfig.getFlushThresholdSegmentSizeBytes();
final long timeThresholdMillis =
streamConfig.getFlushThresholdTimeMillis();
final int autotuneInitialRows = streamConfig.getFlushAutotuneInitialRows();
final long optimalSegmentSizeBytesMin = desiredSegmentSizeBytes / 2;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 98c02b1..396e4bd 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -24,7 +24,6 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;
@@ -101,14 +100,14 @@ public class FlushThresholdUpdaterTest {
PartitionLevelStreamConfig streamConfig =
mock(PartitionLevelStreamConfig.class);
when(streamConfig.getTableNameWithType()).thenReturn(REALTIME_TABLE_NAME);
when(streamConfig.getFlushThresholdRows()).thenReturn(0);
-
when(streamConfig.getFlushSegmentDesiredSizeBytes()).thenReturn(flushSegmentDesiredSizeBytes);
+
when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(flushSegmentDesiredSizeBytes);
when(streamConfig.getFlushThresholdTimeMillis()).thenReturn(flushThresholdTimeMillis);
when(streamConfig.getFlushAutotuneInitialRows()).thenReturn(flushAutotuneInitialRows);
return streamConfig;
}
private PartitionLevelStreamConfig mockDefaultAutotuneStreamConfig() {
- return
mockAutotuneStreamConfig(StreamConfig.DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES,
+ return
mockAutotuneStreamConfig(StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES,
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS,
StreamConfig.DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS);
}
@@ -121,7 +120,7 @@ public class FlushThresholdUpdaterTest {
@Test
public void testSegmentSizeBasedFlushThreshold() {
PartitionLevelStreamConfig streamConfig =
mockDefaultAutotuneStreamConfig();
- long desiredSegmentSizeBytes =
streamConfig.getFlushSegmentDesiredSizeBytes();
+ long desiredSegmentSizeBytes =
streamConfig.getFlushThresholdSegmentSizeBytes();
long segmentSizeLowerLimit = (long) (desiredSegmentSizeBytes * 0.99);
long segmentSizeHigherLimit = (long) (desiredSegmentSizeBytes * 1.01);
@@ -307,7 +306,7 @@ public class FlushThresholdUpdaterTest {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new
SegmentSizeBasedFlushThresholdUpdater();
// Use customized stream config
- long flushSegmentDesiredSizeBytes =
StreamConfig.DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES / 2;
+ long flushSegmentDesiredSizeBytes =
StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES / 2;
long flushThresholdTimeMillis =
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS / 2;
int flushAutotuneInitialRows =
StreamConfig.DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS / 2;
PartitionLevelStreamConfig streamConfig =
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index af09f30..e5855dd 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -193,8 +193,8 @@ public class StreamConfigTest {
Assert.assertEquals(streamConfig.getFetchTimeoutMillis(),
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(streamConfig.getFlushThresholdRows(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
- Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(),
- StreamConfig.DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES);
+ Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
+ StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
consumerType = "lowLevel,highLevel";
String offsetCriteria = "smallest";
@@ -221,7 +221,7 @@ public class StreamConfigTest {
fetchTimeout);
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
flushThresholdRows);
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME,
flushThresholdTime);
- streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE,
flushSegmentSize);
+
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE,
flushSegmentSize);
streamConfig = new StreamConfig(tableName, streamConfigMap);
Assert.assertEquals(streamConfig.getType(), streamType);
@@ -238,7 +238,7 @@ public class StreamConfigTest {
Assert.assertEquals(streamConfig.getFlushThresholdRows(),
Integer.parseInt(flushThresholdRows));
Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(),
(long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
- Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(),
DataSizeUtils.toBytes(flushSegmentSize));
+ Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
DataSizeUtils.toBytes(flushSegmentSize));
// Backward compatibility check for flushThresholdTime
flushThresholdTime = "18000000";
@@ -249,6 +249,18 @@ public class StreamConfigTest {
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME,
flushThresholdTime);
streamConfig = new StreamConfig(tableName, streamConfigMap);
Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
+
+ // Backward compatibility check for flush threshold rows
+
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
+
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS,
"10000");
+ streamConfig = new StreamConfig(tableName, streamConfigMap);
+ Assert.assertEquals(streamConfig.getFlushThresholdRows(), 10000);
+
+ // Backward compatibility check for flush threshold segment size
+
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE);
+
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE,
"10M");
+ streamConfig = new StreamConfig(tableName, streamConfigMap);
+ Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
DataSizeUtils.toBytes("10M"));
}
/**
@@ -313,25 +325,38 @@ public class StreamConfigTest {
Assert
.assertEquals(streamConfig.getConnectionTimeoutMillis(),
StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
- // Invalid flush threshold rows
+ // Invalid flush threshold rows - deprecated property
streamConfigMap.remove(StreamConfigProperties
.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS));
+
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS,
"rows");
+ streamConfig = new StreamConfig(tableName, streamConfigMap);
+ Assert.assertEquals(streamConfig.getFlushThresholdRows(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
+
+ // Invalid flush threshold rows - new property
+
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS);
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
"rows");
streamConfig = new StreamConfig(tableName, streamConfigMap);
Assert.assertEquals(streamConfig.getFlushThresholdRows(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
// Invalid flush threshold time
-
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
+
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS);
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME,
"time");
streamConfig = new StreamConfig(tableName, streamConfigMap);
Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
- // Invalid flush segment size
+ // Invalid flush segment size - deprecated property
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME);
- streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE,
"size");
+
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE,
"size");
+ streamConfig = new StreamConfig(tableName, streamConfigMap);
+ Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
+ StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
+
+ // Invalid flush segment size - new property
+
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE);
+
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE,
"size");
streamConfig = new StreamConfig(tableName, streamConfigMap);
- Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(),
- StreamConfig.DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES);
+ Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
+ StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
}
/**
@@ -380,7 +405,7 @@ public class StreamConfigTest {
(long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
// llc overrides provided, but base values will be picked in base
StreamConfigs
- streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS +
StreamConfigProperties.LLC_SUFFIX,
+
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS
+ StreamConfigProperties.LLC_SUFFIX,
flushThresholdRowsLLC);
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME +
StreamConfigProperties.LLC_SUFFIX,
flushThresholdTimeLLC);
@@ -403,7 +428,7 @@ public class StreamConfigTest {
(long) TimeUtils.convertPeriodToMillis(flushThresholdTimeLLC));
// PartitionLevelStreamConfig should use base values if llc overrides not
provided
- streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS
+ StreamConfigProperties.LLC_SUFFIX);
+
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS
+ StreamConfigProperties.LLC_SUFFIX);
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME
+ StreamConfigProperties.LLC_SUFFIX);
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
flushThresholdRows);
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME,
flushThresholdTime);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
index 0f6a383..e545b94 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelStreamConfig.java
@@ -39,7 +39,7 @@ public class PartitionLevelStreamConfig extends StreamConfig {
@Override
protected int extractFlushThresholdRows(Map<String, String> streamConfigMap)
{
String flushThresholdRowsKey =
- StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS +
StreamConfigProperties.LLC_SUFFIX;
+ StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS +
StreamConfigProperties.LLC_SUFFIX;
String flushThresholdRowsStr = streamConfigMap.get(flushThresholdRowsKey);
if (flushThresholdRowsStr != null) {
try {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 817bcd5..d343203 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -46,7 +46,7 @@ public class StreamConfig {
public static final int DEFAULT_FLUSH_THRESHOLD_ROWS = 5_000_000;
public static final long DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS =
TimeUnit.MILLISECONDS.convert(6, TimeUnit.HOURS);
- public static final long DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES = 200 *
1024 * 1024; // 200M
+ public static final long DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES = 200 *
1024 * 1024; // 200M
public static final int DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS = 100_000;
public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
@@ -71,7 +71,7 @@ public class StreamConfig {
private final int _flushThresholdRows;
private final long _flushThresholdTimeMillis;
- private final long _flushSegmentDesiredSizeBytes;
+ private final long _flushThresholdSegmentSizeBytes;
private final int _flushAutotuneInitialRows; // initial num rows to use for
SegmentSizeBasedFlushThresholdUpdater
private final String _groupId;
@@ -163,22 +163,7 @@ public class StreamConfig {
_flushThresholdRows = extractFlushThresholdRows(streamConfigMap);
_flushThresholdTimeMillis =
extractFlushThresholdTimeMillis(streamConfigMap);
-
- long flushDesiredSize = -1;
- String flushSegmentDesiredSizeValue =
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE);
- if (flushSegmentDesiredSizeValue != null) {
- try {
- flushDesiredSize = DataSizeUtils.toBytes(flushSegmentDesiredSizeValue);
- } catch (Exception e) {
- LOGGER.warn("Invalid config {}: {}, defaulting to: {}",
StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE,
- flushSegmentDesiredSizeValue,
DataSizeUtils.fromBytes(DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES));
- }
- }
- if (flushDesiredSize > 0) {
- _flushSegmentDesiredSizeBytes = flushDesiredSize;
- } else {
- _flushSegmentDesiredSizeBytes = DEFAULT_FLUSH_SEGMENT_DESIRED_SIZE_BYTES;
- }
+ _flushThresholdSegmentSizeBytes =
extractFlushThresholdSegmentSize(streamConfigMap);
int autotuneInitialRows = 0;
String initialRowsValue =
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS);
@@ -199,8 +184,39 @@ public class StreamConfig {
_streamConfigMap.putAll(streamConfigMap);
}
+ private long extractFlushThresholdSegmentSize(Map<String, String>
streamConfigMap) {
+ long segmentSizeBytes = -1;
+ String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE;
+ String flushThresholdSegmentSizeStr = streamConfigMap.get(key);
+ if (flushThresholdSegmentSizeStr == null) {
+ // for backward compatibility with older property
+ key = StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE;
+ flushThresholdSegmentSizeStr = streamConfigMap.get(key);
+ }
+
+ if (flushThresholdSegmentSizeStr != null) {
+ try {
+ segmentSizeBytes = DataSizeUtils.toBytes(flushThresholdSegmentSizeStr);
+ } catch (Exception e) {
+ LOGGER.warn("Invalid config {}: {}, defaulting to: {}", key,
flushThresholdSegmentSizeStr,
+
DataSizeUtils.fromBytes(DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES));
+ }
+ }
+ if (segmentSizeBytes > 0) {
+ return segmentSizeBytes;
+ } else {
+ return DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES;
+ }
+ }
+
protected int extractFlushThresholdRows(Map<String, String> streamConfigMap)
{
- String flushThresholdRowsStr =
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS;
+ String flushThresholdRowsStr = streamConfigMap.get(key);
+ if (flushThresholdRowsStr == null) {
+ // for backward compatibility with older property
+ key = StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS;
+ flushThresholdRowsStr = streamConfigMap.get(key);
+ }
if (flushThresholdRowsStr != null) {
try {
int flushThresholdRows = Integer.parseInt(flushThresholdRowsStr);
@@ -208,8 +224,8 @@ public class StreamConfig {
Preconditions.checkState(flushThresholdRows >= 0);
return flushThresholdRows;
} catch (Exception e) {
- LOGGER.warn("Invalid config {}: {}, defaulting to: {}",
StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
- flushThresholdRowsStr, DEFAULT_FLUSH_THRESHOLD_ROWS);
+ LOGGER
+ .warn("Invalid config {}: {}, defaulting to: {}", key,
flushThresholdRowsStr, DEFAULT_FLUSH_THRESHOLD_ROWS);
return DEFAULT_FLUSH_THRESHOLD_ROWS;
}
} else {
@@ -289,8 +305,8 @@ public class StreamConfig {
return _flushThresholdTimeMillis;
}
- public long getFlushSegmentDesiredSizeBytes() {
- return _flushSegmentDesiredSizeBytes;
+ public long getFlushThresholdSegmentSizeBytes() {
+ return _flushThresholdSegmentSizeBytes;
}
public int getFlushAutotuneInitialRows() {
@@ -315,7 +331,7 @@ public class StreamConfig {
+ _consumerTypes + ", _consumerFactoryClassName='" +
_consumerFactoryClassName + '\'' + ", _offsetCriteria='"
+ _offsetCriteria + '\'' + ", _connectionTimeoutMillis=" +
_connectionTimeoutMillis + ", _fetchTimeoutMillis="
+ _fetchTimeoutMillis + ", _flushThresholdRows=" + _flushThresholdRows
+ ", _flushThresholdTimeMillis="
- + _flushThresholdTimeMillis + ", _flushSegmentDesiredSizeBytes=" +
_flushSegmentDesiredSizeBytes
+ + _flushThresholdTimeMillis + ", _flushSegmentDesiredSizeBytes=" +
_flushThresholdSegmentSizeBytes
+ ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ",
_decoderClass='" + _decoderClass + '\''
+ ", _decoderProperties=" + _decoderProperties + ", _groupId='" +
_groupId + ", _tableNameWithType='"
+ _tableNameWithType + '}';
@@ -337,7 +353,7 @@ public class StreamConfig {
.isEqual(_fetchTimeoutMillis, that._fetchTimeoutMillis) &&
EqualityUtils
.isEqual(_flushThresholdRows, that._flushThresholdRows) &&
EqualityUtils
.isEqual(_flushThresholdTimeMillis, that._flushThresholdTimeMillis) &&
EqualityUtils
- .isEqual(_flushSegmentDesiredSizeBytes,
that._flushSegmentDesiredSizeBytes) && EqualityUtils
+ .isEqual(_flushThresholdSegmentSizeBytes,
that._flushThresholdSegmentSizeBytes) && EqualityUtils
.isEqual(_flushAutotuneInitialRows, that._flushAutotuneInitialRows) &&
EqualityUtils.isEqual(_type, that._type)
&& EqualityUtils.isEqual(_topicName, that._topicName) && EqualityUtils
.isEqual(_consumerTypes, that._consumerTypes) && EqualityUtils
@@ -359,7 +375,7 @@ public class StreamConfig {
result = EqualityUtils.hashCodeOf(result, _fetchTimeoutMillis);
result = EqualityUtils.hashCodeOf(result, _flushThresholdRows);
result = EqualityUtils.hashCodeOf(result, _flushThresholdTimeMillis);
- result = EqualityUtils.hashCodeOf(result, _flushSegmentDesiredSizeBytes);
+ result = EqualityUtils.hashCodeOf(result, _flushThresholdSegmentSizeBytes);
result = EqualityUtils.hashCodeOf(result, _flushAutotuneInitialRows);
result = EqualityUtils.hashCodeOf(result, _decoderClass);
result = EqualityUtils.hashCodeOf(result, _decoderProperties);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index 8284611..ffe92dc 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -51,6 +51,8 @@ public class StreamConfigProperties {
public static final String SEGMENT_FLUSH_THRESHOLD_TIME =
"realtime.segment.flush.threshold.time";
/**
+ * @deprecated because the property key is confusing (says size but is
actually rows). Use {@link StreamConfigProperties#SEGMENT_FLUSH_THRESHOLD_ROWS}
+ *
* Row count flush threshold for realtime segments. This behaves in a
similar way for HLC and LLC. For HLC,
* since there is only one consumer per server, this size is used as the
size of the consumption buffer and
* determines after how many rows we flush to disk. For example, if this
threshold is set to two million rows,
@@ -69,9 +71,12 @@ public class StreamConfigProperties {
* the size of the completed segment is the desired size (see
REALTIME_DESIRED_SEGMENT_SIZE), unless
* REALTIME_SEGMENT_FLUSH_TIME is reached first)
*/
- public static final String SEGMENT_FLUSH_THRESHOLD_ROWS =
"realtime.segment.flush.threshold.size";
+ public static final String DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS =
"realtime.segment.flush.threshold.size";
+ public static final String SEGMENT_FLUSH_THRESHOLD_ROWS =
"realtime.segment.flush.threshold.rows";
- /*
+ /**
+ * @deprecated because the property key is confusing (desired size is not
indicative of segment size). Use {@link
StreamConfigProperties#SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE}
+ *
* The desired size of a completed realtime segment.
* This config is used only if REALTIME_SEGMENT_FLUSH_SIZE is set
* to 0. Default value of REALTIME_SEGMENT_FLUSH_SIZE is "200M". Values are
parsed using DataSize class.
@@ -88,13 +93,13 @@ public class StreamConfigProperties {
*
* Not included here is any heap memory used (currently inverted index uses
heap memory for consuming partitions).
*/
- public static final String SEGMENT_FLUSH_DESIRED_SIZE =
"realtime.segment.flush.desired.size";
+ public static final String DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE =
"realtime.segment.flush.desired.size";
+ public static final String SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE =
"realtime.segment.flush.threshold.segment.size";
/**
* The initial num rows to use for segment size auto tuning. By default
100_000 is used.
*/
public static final String SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS =
"realtime.segment.flush.autotune.initialRows";
-
// Time threshold that controller will wait for the segment to be built by
the server
public static final String SEGMENT_COMMIT_TIMEOUT_SECONDS =
"realtime.segment.commit.timeoutSeconds";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]