This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3892812 Add "num rows in segments" and "num segments queried per
host" to the output of Realtime Provisioning Rule (#7282)
3892812 is described below
commit 38928124496237402b2260c91fa170b2f3fa0779
Author: Sajjad Moradi <[email protected]>
AuthorDate: Wed Aug 25 18:41:16 2021 -0700
Add "num rows in segments" and "num segments queried per host" to the
output of Realtime Provisioning Rule (#7282)
* Fix issue with output of Realtime Prov. Rule
* Fix rebase conflict
* Resolve conflict on git rebase
* Add num rows for each segment size recommendation
* Add num segments queried per host to the output
* Fix "division by zero" issue with low ingestion rate use cases
* Make long line shorter
---
.../realtime/provisioning/MemoryEstimator.java | 14 +++-
.../rules/impl/RealtimeProvisioningRule.java | 31 ++++++++-
.../rules/impl/VariedLengthDictionaryRule.java | 2 +-
.../recommender/rules/io/configs/IndexConfig.java | 8 +--
.../controller/recommender/TestConfigEngine.java | 15 +++--
...ealtimeProvisioningInput_highIngestionRate.json | 75 ++++++++++++++++++++++
6 files changed, 130 insertions(+), 15 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
index e0cc02d..fd915e1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
@@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory;
*/
public class MemoryEstimator {
- private static final String NOT_APPLICABLE = "NA";
+ public static final String NOT_APPLICABLE = "NA";
private static final String STATS_FILE_NAME = "stats.ser";
private static final String STATS_FILE_COPY_NAME = "stats.copy.ser";
@@ -95,13 +95,14 @@ public class MemoryEstimator {
private String[][] _activeMemoryPerHost;
private String[][] _optimalSegmentSize;
+ private String[][] _numRowsInSegment;
private String[][] _consumingMemoryPerHost;
private String[][] _numSegmentsQueriedPerHost;
/**
* Constructor used for processing the given completed segment
*/
- public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment,
int ingestionRatePerPartition,
+ public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment,
double ingestionRatePerPartition,
long maxUsableHostMemory, int tableRetentionHours, File workingDir) {
_maxUsableHostMemory = maxUsableHostMemory;
_tableConfig = tableConfig;
@@ -135,7 +136,7 @@ public class MemoryEstimator {
* Constructor used for processing the given data characteristics (instead
of completed segment)
*/
public MemoryEstimator(TableConfig tableConfig, Schema schema,
SchemaWithMetaData schemaWithMetadata,
- int numberOfRows, int ingestionRatePerPartition, long
maxUsableHostMemory, int tableRetentionHours,
+ int numberOfRows, double ingestionRatePerPartition, long
maxUsableHostMemory, int tableRetentionHours,
File workingDir) {
this(tableConfig, generateCompletedSegment(schemaWithMetadata, schema,
tableConfig, numberOfRows, workingDir),
ingestionRatePerPartition, maxUsableHostMemory, tableRetentionHours,
workingDir);
@@ -240,12 +241,14 @@ public class MemoryEstimator {
throws IOException {
_activeMemoryPerHost = new String[numHours.length][numHosts.length];
_optimalSegmentSize = new String[numHours.length][numHosts.length];
+ _numRowsInSegment = new String[numHours.length][numHosts.length];
_consumingMemoryPerHost = new String[numHours.length][numHosts.length];
_numSegmentsQueriedPerHost = new String[numHours.length][numHosts.length];
for (int i = 0; i < numHours.length; i++) {
for (int j = 0; j < numHosts.length; j++) {
_activeMemoryPerHost[i][j] = NOT_APPLICABLE;
_consumingMemoryPerHost[i][j] = NOT_APPLICABLE;
+ _numRowsInSegment[i][j] = NOT_APPLICABLE;
_optimalSegmentSize[i][j] = NOT_APPLICABLE;
_numSegmentsQueriedPerHost[i][j] = NOT_APPLICABLE;
}
@@ -296,6 +299,7 @@ public class MemoryEstimator {
DataSizeUtils.fromBytes(activeMemoryPerHostBytes) + "/" +
DataSizeUtils.fromBytes(mappedMemoryPerHost);
_consumingMemoryPerHost[i][j] =
DataSizeUtils.fromBytes(totalMemoryForConsumingSegmentsPerHost);
_optimalSegmentSize[i][j] =
DataSizeUtils.fromBytes(completedSegmentSizeBytes);
+ _numRowsInSegment[i][j] = String.valueOf(totalDocs);
_numSegmentsQueriedPerHost[i][j] =
String.valueOf(numActiveSegmentsPerPartition *
totalConsumingPartitionsPerHost);
}
@@ -431,6 +435,10 @@ public class MemoryEstimator {
return _optimalSegmentSize;
}
+ public String[][] getNumRowsInSegment() {
+ return _numRowsInSegment;
+ }
+
public String[][] getConsumingMemoryPerHost() {
return _consumingMemoryPerHost;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
index 7096186..d505cfb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
@@ -43,6 +43,8 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import static
org.apache.pinot.controller.recommender.realtime.provisioning.MemoryEstimator.NOT_APPLICABLE;
+
/**
* This rule gives some recommendations useful for provisioning real time
tables. Specifically it provides some
@@ -51,6 +53,8 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
*/
public class RealtimeProvisioningRule extends AbstractRule {
public static final String OPTIMAL_SEGMENT_SIZE = "Optimal Segment Size";
+ public static final String NUM_ROWS_IN_SEGMENT = "Number of Rows in Segment";
+ public static final String NUM_SEGMENTS_QUERIED_PER_HOST = "Number of
Segments Queried per Host";
public static final String CONSUMING_MEMORY_PER_HOST = "Consuming Memory per
Host";
public static final String TOTAL_MEMORY_USED_PER_HOST = "Total Memory Used
per Host";
@@ -75,7 +79,7 @@ public class RealtimeProvisioningRule extends AbstractRule {
createTableConfig(_output.getIndexConfig(), _input.getSchema(),
_output.isAggregateMetrics());
long maxUsableHostMemoryByte =
DataSizeUtils.toBytes(_params.getMaxUsableHostMemory());
int totalConsumingPartitions = _params.getNumPartitions() *
_params.getNumReplicas();
- int ingestionRatePerPartition = (int)
_input.getNumMessagesPerSecInKafkaTopic() / _params.getNumPartitions();
+ double ingestionRatePerPartition = (double)
_input.getNumMessagesPerSecInKafkaTopic() / _params.getNumPartitions();
int retentionHours = _params.getRealtimeTableRetentionHours();
int[] numHosts = _params.getNumHosts();
int[] numHours = _params.getNumHours();
@@ -108,7 +112,7 @@ public class RealtimeProvisioningRule extends AbstractRule {
setIfNotEmpty(indexConfig.getNoDictionaryColumns(),
tableConfigBuilder::setNoDictionaryColumns);
setIfNotEmpty(indexConfig.getInvertedIndexColumns(),
tableConfigBuilder::setInvertedIndexColumns);
setIfNotEmpty(indexConfig.getOnHeapDictionaryColumns(),
tableConfigBuilder::setOnHeapDictionaryColumns);
- setIfNotEmpty(indexConfig.getVariedLengthDictionaryColumns(),
tableConfigBuilder::setVarLengthDictionaryColumns);
+ setIfNotEmpty(indexConfig.getVarLengthDictionaryColumns(),
tableConfigBuilder::setVarLengthDictionaryColumns);
TableConfig tableConfig = tableConfigBuilder.build();
tableConfig.getIndexingConfig().setAggregateMetrics(aggregateMetrics);
@@ -131,9 +135,16 @@ public class RealtimeProvisioningRule extends AbstractRule
{
Map<String, Map<String, String>> rtProvRecommendations) {
Map<String, String> segmentSizes =
makeMatrix(memoryEstimator.getOptimalSegmentSize(), numHosts, numHours);
Map<String, String> consumingMemory =
makeMatrix(memoryEstimator.getConsumingMemoryPerHost(), numHosts, numHours);
+ Map<String, String> numSegmentsQueried =
+ makeMatrix(memoryEstimator.getNumSegmentsQueriedPerHost(), numHosts,
numHours);
+ Map<String, String> numRowsInSegment =
makeMatrix(memoryEstimator.getNumRowsInSegment(), numHosts, numHours,
+ element -> element.equals(NOT_APPLICABLE) ? element :
convertLargeNumberToHumanReadable(element));
Map<String, String> totalMemory =
makeMatrix(memoryEstimator.getActiveMemoryPerHost(), numHosts, numHours,
- element -> element.substring(0, element.indexOf('/'))); // take the
first number (eg: 48G/48G)
+ element -> element.equals(NOT_APPLICABLE) ? element
+ : element.substring(0, element.indexOf('/'))); // take the first
number (eg: 48G/48G)
rtProvRecommendations.put(OPTIMAL_SEGMENT_SIZE, segmentSizes);
+ rtProvRecommendations.put(NUM_ROWS_IN_SEGMENT, numRowsInSegment);
+ rtProvRecommendations.put(NUM_SEGMENTS_QUERIED_PER_HOST,
numSegmentsQueried);
rtProvRecommendations.put(CONSUMING_MEMORY_PER_HOST, consumingMemory);
rtProvRecommendations.put(TOTAL_MEMORY_USED_PER_HOST, totalMemory);
}
@@ -173,4 +184,18 @@ public class RealtimeProvisioningRule extends AbstractRule
{
return output;
}
+
+ private String convertLargeNumberToHumanReadable(String num) {
+ int val = Integer.parseInt(num);
+ if (val >= 10_000_000) {
+ return (val / 1_000_000) + "M";
+ }
+ if (val >= 1_000_000) {
+ return (val / 100_000) / 10.0 + "M"; // eg: 5,432,000 -> 5.4M
+ }
+ if (val >= 10_000) {
+ return (val / 1000) + "K";
+ }
+ return num;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java
index b530601..7573591 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java
@@ -44,7 +44,7 @@ public class VariedLengthDictionaryRule extends AbstractRule {
LOGGER.debug("{} {}", _input.getFieldType(colName), colName);
if (_input.getFieldType(colName) == FieldSpec.DataType.STRING
|| _input.getFieldType(colName) == FieldSpec.DataType.BYTES) {
-
_output.getIndexConfig().getVariedLengthDictionaryColumns().add(colName);
+
_output.getIndexConfig().getVarLengthDictionaryColumns().add(colName);
}
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
index 2f17615..160491a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
@@ -36,13 +36,13 @@ public class IndexConfig {
Set<String> _noDictionaryColumns = new HashSet<>();
Set<String> _onHeapDictionaryColumns = new HashSet<>();
- Set<String> _variedLengthDictionaryColumns = new HashSet<>();
+ Set<String> _varLengthDictionaryColumns = new HashSet<>();
boolean _isSortedColumnOverwritten = false;
@JsonSetter(nulls = Nulls.SKIP)
public void setVariedLengthDictionaryColumns(Set<String>
variedLengthDictionaryColumns) {
- _variedLengthDictionaryColumns = variedLengthDictionaryColumns;
+ _varLengthDictionaryColumns = variedLengthDictionaryColumns;
}
@JsonSetter(nulls = Nulls.SKIP)
@@ -84,8 +84,8 @@ public class IndexConfig {
_isSortedColumnOverwritten = sortedColumnOverwritten;
}
- public Set<String> getVariedLengthDictionaryColumns() {
- return _variedLengthDictionaryColumns;
+ public Set<String> getVarLengthDictionaryColumns() {
+ return _varLengthDictionaryColumns;
}
public Set<String> getBloomFilterColumns() {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
index 0af4635..57ede19 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
@@ -45,9 +45,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static
org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.CONSUMING_MEMORY_PER_HOST;
-import static
org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.OPTIMAL_SEGMENT_SIZE;
-import static
org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.TOTAL_MEMORY_USED_PER_HOST;
+import static
org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.*;
import static org.testng.Assert.*;
@@ -231,7 +229,7 @@ public class TestConfigEngine {
AbstractRule abstractRule =
RulesToExecute.RuleFactory.getRule(RulesToExecute.Rule.VariedLengthDictionaryRule,
_input, output);
abstractRule.run();
-
assertEquals(output.getIndexConfig().getVariedLengthDictionaryColumns().toString(),
"[a, d, m]");
+
assertEquals(output.getIndexConfig().getVarLengthDictionaryColumns().toString(),
"[a, d, m]");
}
@Test
@@ -436,6 +434,13 @@ public class TestConfigEngine {
}
@Test
+ void testRealtimeProvisioningRuleWithHighIngestionRate() throws Exception {
+ // Total memory for some of the options are greater than the provided max
memory in a host.
+ // For those option, the returned values is "NA"
+
testRealtimeProvisioningRule("recommenderInput/RealtimeProvisioningInput_highIngestionRate.json");
+ }
+
+ @Test
void testAggregateMetricsRule()
throws Exception {
ConfigManager output =
runRecommenderDriver("recommenderInput/AggregateMetricsRuleInput.json");
@@ -487,6 +492,8 @@ public class TestConfigEngine {
ConfigManager output = runRecommenderDriver(fileName);
Map<String, Map<String, String>> recommendations =
output.getRealtimeProvisioningRecommendations();
assertRealtimeProvisioningRecommendation(recommendations.get(OPTIMAL_SEGMENT_SIZE));
+
assertRealtimeProvisioningRecommendation(recommendations.get(NUM_ROWS_IN_SEGMENT));
+
assertRealtimeProvisioningRecommendation(recommendations.get(NUM_SEGMENTS_QUERIED_PER_HOST));
assertRealtimeProvisioningRecommendation(recommendations.get(CONSUMING_MEMORY_PER_HOST));
assertRealtimeProvisioningRecommendation(recommendations.get(TOTAL_MEMORY_USED_PER_HOST));
}
diff --git
a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_highIngestionRate.json
b/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_highIngestionRate.json
new file mode 100644
index 0000000..3f687be
--- /dev/null
+++
b/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_highIngestionRate.json
@@ -0,0 +1,75 @@
+{
+ "schema": {
+ "dateTimeFieldSpecs": [
+ {
+ "dataType": "LONG",
+ "format": "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS",
+ "name": "timestampMillis",
+ "cardinality": 10000
+ }
+ ],
+ "dimensionFieldSpecs": [
+ {
+ "averageLength": 8,
+ "cardinality": 16,
+ "dataType": "STRING",
+ "name": "colA"
+ },
+ {
+ "averageLength": 16,
+ "cardinality": 200,
+ "dataType": "STRING",
+ "name": "colB"
+ },
+ {
+ "averageLength": 50,
+ "cardinality": 100000,
+ "dataType": "STRING",
+ "name": "colC"
+ },
+ {
+ "averageLength": 25,
+ "cardinality": 5000,
+ "dataType": "INT",
+ "name": "partition"
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "cardinality": 5000,
+ "dataType": "LONG",
+ "name": "metricA"
+ },
+ {
+ "cardinality": 5000,
+ "dataType": "LONG",
+ "name": "metricB"
+ }
+ ],
+ "schemaName": "myTable"
+ },
+ "queriesWithWeights":{
+ "select colC, \"partition\" as partitionNum, max(metricA) as
maxMetricA,avg(metricA) as avgMetricA, avg(metricB) as avgMetricB from myTable
where colA='valA' and timestampMillis > now() - 3600000 and colB='valB' and
timestampMillis < now() group by colB,colC,\"partition\" order by max(metricA)
desc limit 10000": 1,
+ "select colC, \"partition\" as partitionNum, max(metricA) as
maxMetricA,avg(metricA) as avgMetricA from myTable where colA='val1' and
timestampMillis > now() - 3600000 and colB='valB' and timestampMillis < now()
and maxMetricA > 2000 group by colB,colC,\"partition\" order by max(metricA)
desc limit 10000": 1,
+ "select timestampMillis, sum(metricB) from myTable where (colC='A' or
colC='B') and (timestampMillis >= 123) group by timestampMillis order by
timestampMillis asc": 1
+ },
+ "qps": 10,
+ "tableType": "REALTIME",
+ "latencySLA": 500,
+ "rulesToExecute": {
+ "recommendRealtimeProvisioning": true
+ },
+ "numMessagesPerSecInKafkaTopic":50000,
+ "realtimeProvisioningRuleParams": {
+ "numPartitions": 32,
+ "numReplicas": 2,
+ "realtimeTableRetentionHours": 24,
+ "maxUsableHostMemory": "150G",
+ "numHours": [2, 4, 6, 8, 10, 12],
+ "numHosts": [3, 6, 9, 12, 15, 18, 21],
+ "numRowsInGeneratedSegment": 10000
+ },
+ "overWrittenConfigs": {
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]