This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 079b86d Config for overriding initial rows threshold in segment size
auto tuning (#4376)
079b86d is described below
commit 079b86d897afb76a4328cb5a057282676b6a2213
Author: Neha Pawar <[email protected]>
AuthorDate: Fri Jul 12 11:09:49 2019 -0700
Config for overriding initial rows threshold in segment size auto tuning
(#4376)
---
.../segment/FlushThresholdUpdateManager.java | 11 ++---
.../SegmentSizeBasedFlushThresholdUpdater.java | 14 +++---
.../segment/FlushThresholdUpdaterTest.java | 28 +++++++----
.../pinot/core/realtime/stream/StreamConfig.java | 55 ++++++++++++++++------
.../realtime/stream/StreamConfigProperties.java | 8 ++--
5 files changed, 76 insertions(+), 40 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
index a5c44ab..e7cf2e7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
@@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
*/
public class FlushThresholdUpdateManager {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FlushThresholdUpdateManager.class);
-
private ConcurrentMap<String, FlushThresholdUpdater>
_flushThresholdUpdaterMap = new ConcurrentHashMap<>();
/**
@@ -41,8 +39,6 @@ public class FlushThresholdUpdateManager {
* If flush size < 0, create a new DefaultFlushThresholdUpdater with default
flush size
* If flush size > 0, create a new DefaultFlushThresholdUpdater with given
flush size.
* If flush size == 0, create new SegmentSizeBasedFlushThresholdUpdater if
not already created. Create only 1 per table, because we want to maintain
tuning information for the table in the updater
- * @param realtimeTableConfig
- * @return
*/
public FlushThresholdUpdater getFlushThresholdUpdater(TableConfig
realtimeTableConfig) {
final String tableName = realtimeTableConfig.getTableName();
@@ -50,11 +46,12 @@ public class FlushThresholdUpdateManager {
new
PartitionLevelStreamConfig(realtimeTableConfig.getIndexingConfig().getStreamConfigs());
final int tableFlushSize = streamConfig.getFlushThresholdRows();
- final long desiredSegmentSize =
streamConfig.getFlushSegmentDesiredSizeBytes();
if (tableFlushSize == 0) {
- return _flushThresholdUpdaterMap
- .computeIfAbsent(tableName, k -> new
SegmentSizeBasedFlushThresholdUpdater(desiredSegmentSize));
+ final long desiredSegmentSize =
streamConfig.getFlushSegmentDesiredSizeBytes();
+ final int flushAutotuneInitialRows =
streamConfig.getFlushAutotuneInitialRows();
+ return _flushThresholdUpdaterMap.computeIfAbsent(tableName,
+ k -> new SegmentSizeBasedFlushThresholdUpdater(desiredSegmentSize,
flushAutotuneInitialRows));
} else {
_flushThresholdUpdaterMap.remove(tableName);
return new DefaultFlushThresholdUpdater(tableFlushSize);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 116280b..d3c40c4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -39,14 +39,13 @@ public class SegmentSizeBasedFlushThresholdUpdater
implements FlushThresholdUpda
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class);
- private static final int INITIAL_ROWS_THRESHOLD = 100_000;
-
private static final double CURRENT_SEGMENT_RATIO_WEIGHT = 0.1;
private static final double PREVIOUS_SEGMENT_RATIO_WEIGHT = 0.9;
private static final double ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT = 1.1;
private static final int MINIMUM_NUM_ROWS_THRESHOLD = 10_000;
private final long _desiredSegmentSizeBytes;
+ private final int _autotuneInitialRows;
/** Below this size, we double the rows threshold */
private final double _optimalSegmentSizeBytesMin;
@@ -54,8 +53,8 @@ public class SegmentSizeBasedFlushThresholdUpdater implements
FlushThresholdUpda
private final double _optimalSegmentSizeBytesMax;
@VisibleForTesting
- int getInitialRowsThreshold() {
- return INITIAL_ROWS_THRESHOLD;
+ int getAutotuneInitialRows() {
+ return _autotuneInitialRows;
}
@VisibleForTesting
@@ -81,10 +80,11 @@ public class SegmentSizeBasedFlushThresholdUpdater
implements FlushThresholdUpda
// num rows to segment size ratio of last committed segment for this table
private double _latestSegmentRowsToSizeRatio = 0;
- public SegmentSizeBasedFlushThresholdUpdater(long desiredSegmentSizeBytes) {
+ public SegmentSizeBasedFlushThresholdUpdater(long desiredSegmentSizeBytes,
int autotuneInitialRows) {
_desiredSegmentSizeBytes = desiredSegmentSizeBytes;
_optimalSegmentSizeBytesMin = _desiredSegmentSizeBytes / 2;
_optimalSegmentSizeBytesMax = _desiredSegmentSizeBytes * 1.5;
+ _autotuneInitialRows = autotuneInitialRows;
}
// synchronized since this method could be called for multiple partitions of
the same table in different threads
@@ -104,8 +104,8 @@ public class SegmentSizeBasedFlushThresholdUpdater
implements FlushThresholdUpda
newSegmentZKMetadata.setSizeThresholdToFlushSegment((int)
targetSegmentNumRows);
} else {
LOGGER.info("Committing segment zk metadata is not available, setting
threshold for {} as {}", newSegmentName,
- INITIAL_ROWS_THRESHOLD);
-
newSegmentZKMetadata.setSizeThresholdToFlushSegment(INITIAL_ROWS_THRESHOLD);
+ _autotuneInitialRows);
+
newSegmentZKMetadata.setSizeThresholdToFlushSegment(_autotuneInitialRows);
}
return;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 425ebff..b539565 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -38,6 +38,7 @@ import org.testng.annotations.Test;
public class FlushThresholdUpdaterTest {
private static final long DESIRED_SEGMENT_SIZE =
StreamConfig.getDefaultDesiredSegmentSizeBytes();
+ private static final int DEFAULT_INITIAL_ROWS_THRESHOLD =
StreamConfig.getDefaultFlushAutotuneInitialRows();
private Random _random;
private Map<String, double[][]> datasetGraph;
@@ -155,7 +156,7 @@ public class FlushThresholdUpdaterTest {
for (Map.Entry<String, double[][]> entry : datasetGraph.entrySet()) {
SegmentSizeBasedFlushThresholdUpdater
segmentSizeBasedFlushThresholdUpdater =
- new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE);
+ new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE,
DEFAULT_INITIAL_ROWS_THRESHOLD);
double[][] numRowsToSegmentSize = entry.getValue();
@@ -185,7 +186,7 @@ public class FlushThresholdUpdaterTest {
segmentSizeBasedFlushThresholdUpdater
.updateFlushThreshold(newSegmentMetadata, null,
committingSegmentDescriptor, null);
Assert.assertEquals(newSegmentMetadata.getSizeThresholdToFlushSegment(),
- segmentSizeBasedFlushThresholdUpdater.getInitialRowsThreshold());
+ segmentSizeBasedFlushThresholdUpdater.getAutotuneInitialRows());
System.out.println("NumRowsThreshold, SegmentSize");
for (int run = 0; run < numRuns; run++) {
@@ -310,7 +311,7 @@ public class FlushThresholdUpdaterTest {
Assert.assertEquals(flushThresholdUpdater.getClass(),
DefaultFlushThresholdUpdater.class);
Assert.assertEquals(((DefaultFlushThresholdUpdater)
flushThresholdUpdater).getTableFlushSize(), 20000);
- // optimal segment size set to invalid value. Defailt remains the same.
+ // optimal segment size set to invalid value. Default remains the same.
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
"0");
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE,
"Invalid");
realtimeTableConfig = tableConfigBuilder.build();
@@ -332,6 +333,16 @@ public class FlushThresholdUpdaterTest {
flushThresholdUpdater =
manager.getFlushThresholdUpdater(realtimeTableConfig);
Assert.assertEquals(((SegmentSizeBasedFlushThresholdUpdater)
(flushThresholdUpdater)).getDesiredSegmentSizeBytes(),
desiredSegSize);
+ Assert.assertEquals(((SegmentSizeBasedFlushThresholdUpdater)
(flushThresholdUpdater)).getAutotuneInitialRows(),
+ DEFAULT_INITIAL_ROWS_THRESHOLD);
+
+ // initial rows threshold
+
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS,
"500000");
+ realtimeTableConfig = tableConfigBuilder.build();
+ FlushThresholdUpdateManager newManager = new FlushThresholdUpdateManager();
+ flushThresholdUpdater =
newManager.getFlushThresholdUpdater(realtimeTableConfig);
+ Assert.assertEquals(((SegmentSizeBasedFlushThresholdUpdater)
(flushThresholdUpdater)).getAutotuneInitialRows(),
+ 500_000);
}
/**
@@ -368,7 +379,8 @@ public class FlushThresholdUpdaterTest {
Assert.assertNull(metadata0.getTimeThresholdToFlushSegment());
// before committing segment, we switched to size based updation - verify
that new thresholds are set as per size based strategy
- flushThresholdUpdater = new
SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE);
+ flushThresholdUpdater = new
SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE,
+ DEFAULT_INITIAL_ROWS_THRESHOLD);
startOffset += 1000;
updateCommittingSegmentMetadata(metadata0, startOffset, 250_000);
@@ -407,10 +419,10 @@ public class FlushThresholdUpdaterTest {
// initial segment
LLCRealtimeSegmentZKMetadata metadata0 = getNextSegmentMetadata(tableName,
startOffset, partitionId, seqNum++);
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
- new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE);
+ new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE,
DEFAULT_INITIAL_ROWS_THRESHOLD);
committingSegmentDescriptor = new
CommittingSegmentDescriptor(metadata0.getSegmentName(), startOffset, 0);
flushThresholdUpdater.updateFlushThreshold(metadata0, null,
committingSegmentDescriptor, null);
- Assert.assertEquals(metadata0.getSizeThresholdToFlushSegment(),
flushThresholdUpdater.getInitialRowsThreshold());
+ Assert.assertEquals(metadata0.getSizeThresholdToFlushSegment(),
flushThresholdUpdater.getAutotuneInitialRows());
// next segment hit time threshold
startOffset += 1000;
@@ -451,7 +463,7 @@ public class FlushThresholdUpdaterTest {
LLCRealtimeSegmentZKMetadata metadata0 = getNextSegmentMetadata(tableName,
startOffset, partitionId, seqNum++);
metadata0.setSegmentName(seg0SegmentName.getSegmentName());
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
- new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE);
+ new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE,
DEFAULT_INITIAL_ROWS_THRESHOLD);
committingSegmentDescriptor =
new CommittingSegmentDescriptor(seg0SegmentName.getSegmentName(),
startOffset, 10_000);
metadata0.setTotalRawDocs(15);
@@ -486,7 +498,7 @@ public class FlushThresholdUpdaterTest {
long seg0time = now - 1334_650;
long seg1time = seg0time + 14_000;
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
- new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE);
+ new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE,
DEFAULT_INITIAL_ROWS_THRESHOLD);
// Initial update is from partition 1
LLCSegmentName seg0SegmentName = new LLCSegmentName(tableName, 1, seqNum,
seg0time);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
index 1d37534..ad65660 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
@@ -47,12 +47,13 @@ public class StreamConfig {
private static final int DEFAULT_FLUSH_THRESHOLD_ROWS = 5_000_000;
private static final long DEFAULT_FLUSH_THRESHOLD_TIME =
TimeUnit.MILLISECONDS.convert(6, TimeUnit.HOURS);
private static final long DEFAULT_DESIRED_SEGMENT_SIZE_BYTES = 200 * 1024 *
1024; // 200M
+ private static final int DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS = 100_000;
private static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
"org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory";
- protected static final long DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS =
30_000;
- protected static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000;
- protected static final String SIMPLE_CONSUMER_TYPE_STRING = "simple";
+ static final long DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS = 30_000;
+ static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000;
+ private static final String SIMPLE_CONSUMER_TYPE_STRING = "simple";
final private String _type;
final private String _topicName;
@@ -68,6 +69,7 @@ public class StreamConfig {
final private int _flushThresholdRows;
final private long _flushThresholdTimeMillis;
final private long _flushSegmentDesiredSizeBytes;
+ final private int _flushAutotuneInitialRows; // initial num rows to use for
SegmentSizeBasedFlushThresholdUpdater
final private String _groupId;
@@ -135,8 +137,8 @@ public class StreamConfig {
try {
connectionTimeoutMillis = Long.parseLong(connectionTimeoutValue);
} catch (Exception e) {
- LOGGER.warn("Caught exception while parsing the connection timeout,
defaulting to {} ms", e,
- DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
+ LOGGER.warn("Caught exception while parsing the connection timeout,
defaulting to {} ms",
+ DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS, e);
}
}
_connectionTimeoutMillis = connectionTimeoutMillis;
@@ -197,6 +199,19 @@ public class StreamConfig {
_flushSegmentDesiredSizeBytes = DEFAULT_DESIRED_SEGMENT_SIZE_BYTES;
}
+ int autotuneInitialRows = 0;
+ String initialRowsValue =
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS);
+ if (initialRowsValue != null) {
+ try {
+ autotuneInitialRows = Integer.parseInt(initialRowsValue);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while parsing {}:{}, defaulting to {}",
+ StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS,
initialRowsValue,
+ DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS, e);
+ }
+ }
+ _flushAutotuneInitialRows = autotuneInitialRows > 0 ? autotuneInitialRows
: DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS;
+
String groupIdKey = StreamConfigProperties.constructStreamProperty(_type,
StreamConfigProperties.GROUP_ID);
_groupId = streamConfigMap.get(groupIdKey);
@@ -227,10 +242,22 @@ public class StreamConfig {
return _consumerFactoryClassName;
}
+ public static String getDefaultConsumerFactoryClassName() {
+ return DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING;
+ }
+
public OffsetCriteria getOffsetCriteria() {
return _offsetCriteria;
}
+ public String getDecoderClass() {
+ return _decoderClass;
+ }
+
+ public Map<String, String> getDecoderProperties() {
+ return _decoderProperties;
+ }
+
public long getConnectionTimeoutMillis() {
return _connectionTimeoutMillis;
}
@@ -255,10 +282,6 @@ public class StreamConfig {
return DEFAULT_FLUSH_THRESHOLD_TIME;
}
- public static String getDefaultConsumerFactoryClassName() {
- return DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING;
- }
-
public long getFlushSegmentDesiredSizeBytes() {
return _flushSegmentDesiredSizeBytes;
}
@@ -267,12 +290,12 @@ public class StreamConfig {
return DEFAULT_DESIRED_SEGMENT_SIZE_BYTES;
}
- public String getDecoderClass() {
- return _decoderClass;
+ public int getFlushAutotuneInitialRows() {
+ return _flushAutotuneInitialRows;
}
- public Map<String, String> getDecoderProperties() {
- return _decoderProperties;
+ public static int getDefaultFlushAutotuneInitialRows() {
+ return DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS;
}
public String getGroupId() {
@@ -290,8 +313,8 @@ public class StreamConfig {
+ _offsetCriteria + '\'' + ", _connectionTimeoutMillis=" +
_connectionTimeoutMillis + ", _fetchTimeoutMillis="
+ _fetchTimeoutMillis + ", _flushThresholdRows=" + _flushThresholdRows
+ ", _flushThresholdTimeMillis="
+ _flushThresholdTimeMillis + ", _flushSegmentDesiredSizeBytes=" +
_flushSegmentDesiredSizeBytes
- + ", _decoderClass='" + _decoderClass + '\'' + ", _decoderProperties="
+ _decoderProperties + ", _groupId='"
- + _groupId + '}';
+ + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ",
_decoderClass='" + _decoderClass
+ + '\'' + ", _decoderProperties=" + _decoderProperties + ", _groupId='"
+ _groupId + '}';
}
@Override
@@ -311,6 +334,7 @@ public class StreamConfig {
.isEqual(_flushThresholdRows, that._flushThresholdRows) &&
EqualityUtils
.isEqual(_flushThresholdTimeMillis, that._flushThresholdTimeMillis) &&
EqualityUtils
.isEqual(_flushSegmentDesiredSizeBytes,
that._flushSegmentDesiredSizeBytes) && EqualityUtils
+ .isEqual(_flushAutotuneInitialRows, that._flushAutotuneInitialRows) &&
EqualityUtils
.isEqual(_type, that._type) && EqualityUtils.isEqual(_topicName,
that._topicName) && EqualityUtils
.isEqual(_consumerTypes, that._consumerTypes) && EqualityUtils
.isEqual(_consumerFactoryClassName, that._consumerFactoryClassName) &&
EqualityUtils
@@ -331,6 +355,7 @@ public class StreamConfig {
result = EqualityUtils.hashCodeOf(result, _flushThresholdRows);
result = EqualityUtils.hashCodeOf(result, _flushThresholdTimeMillis);
result = EqualityUtils.hashCodeOf(result, _flushSegmentDesiredSizeBytes);
+ result = EqualityUtils.hashCodeOf(result, _flushAutotuneInitialRows);
result = EqualityUtils.hashCodeOf(result, _decoderClass);
result = EqualityUtils.hashCodeOf(result, _decoderProperties);
result = EqualityUtils.hashCodeOf(result, _groupId);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
index 6619213..5b4cbf1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
@@ -89,14 +89,16 @@ public class StreamConfigProperties {
*/
public static final String SEGMENT_FLUSH_DESIRED_SIZE =
"realtime.segment.flush.desired.size";
+ /**
+ * The initial num rows to use for segment size auto tuning. By default
100_000 is used.
+ */
+ public static final String SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS =
"realtime.segment.flush.autotune.initialRows";
+
// Time threshold that controller will wait for the segment to be built by
the server
public static final String SEGMENT_COMMIT_TIMEOUT_SECONDS =
"realtime.segment.commit.timeoutSeconds";
/**
* Helper method to create a stream specific property
- * @param streamType
- * @param property
- * @return
*/
public static String constructStreamProperty(String streamType, String
property) {
return Joiner.on(DOT_SEPARATOR).join(STREAM_PREFIX, streamType, property);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]