mapshen opened a new issue #7849:
URL: https://github.com/apache/pinot/issues/7849
We recently found that querying Pinot `upsert` enabled tables can return
different number of rows at different times. The row count returned can be
higher than, equal to, or lower than the true count, whereas we expect it to be
always same as the true count.
## Setup
### Versions
Pinot Version: 0.8.0
Trino Version: 362
### Kafka Topic
Topic `topic1` is single partitioned, and the publisher sends over 1000
messages per second.
### TableConfig
The table has about 200 columns, 100 metric fields and 100 dimension fields.
Column `A` serves as the primary key with ~20,000 unique values. Segment flush
threshold is set to 10,000, which means a new segment is produced every 10
seconds.
```
{
"tableName":"table1",
"schema":{
"metricFieldSpecs":[
{
"name":"B",
"dataType":"DOUBLE"
}
],
"dimensionFieldSpecs":[
{
"name":"A",
"dataType":"STRING"
}
],
"dateTimeFieldSpecs":[
{
"name":"EPOCH",
"dataType":"INT",
"format":"1:SECONDS:EPOCH",
"granularity":"1:SECONDS"
}
],
"primaryKeyColumns":[
"A"
],
"schemaName":"schema1"
},
"realtime":{
"tableName":"table1",
"tableType":"REALTIME",
"segmentsConfig":{
"schemaName":"schema1",
"timeColumnName":"EPOCH",
"replicasPerPartition":"1",
"retentionTimeUnit":"DAYS",
"retentionTimeValue":"4",
"segmentPushType":"APPEND",
"completionConfig":{
"completionMode":"DOWNLOAD"
}
},
"tableIndexConfig":{
"invertedIndexColumns":[
"A"
],
"loadMode":"MMAP",
"nullHandlingEnabled":false,
"streamConfigs":{
"realtime.segment.flush.threshold.rows":"10000",
"realtime.segment.flush.threshold.time":"96h",
"streamType":"kafka",
"stream.kafka.consumer.type":"lowLevel",
"stream.kafka.topic.name":"topic1",
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list":"kafka:9092",
"stream.kafka.consumer.prop.auto.offset.reset":"largest"
}
},
"tenants":{
},
"metadata":{
},
"routing":{
"instanceSelectorType":"strictReplicaGroup"
},
"upsertConfig":{
"mode":"FULL"
}
}
}
```
## Issues
### Case 1: one or two rows are missing
#### Steps to reproduce
1. Bump up the segment flush threshold to a large enough enough number like
1,000,000 to make sure there is no flush.
2. Run `select count(*) from table1` in PQL repeatedly and you will
periodically see numbers like 19,998 or 19,997 while we expect 20,000.
#### Root cause
The process of update an `upsert` table segment is not atomic, using two
steps `remove` and `add` instead. #7844 by @Jackie-Jiang addresses it.
### Case 2: hundreds or thousands of rows are missing
#### Steps to reproduce
1. Set the segment flush threshold to a small enough enough number like
1,000 so that segment flushes happen frequently.
2. Run `select count(*) from table1` in PQL repeatedly and you will see
numbers like 16,703 or 18,234 while we expect 20,000 when a segment is flushed.
#### Root cause
Unclear - upsert across segments not atomic?
### Case 3 - duplicates are returned
1. Continue with the setup in Case 2.
2. Run query1 `select count(*) from table1 where from_unixtime(EPOCH) >
current_timestamp - interval '15' minute` via Trino repeatedly and you will see
numbers like 20023 when you expect to see 18,000 - over 2,000 duplicates
returned.
#### Root cause
We happened to notice that an equivalent Trino query2 `select count(*) from
table1 where EPOCH > to_unixtime(current_timestamp - interval '15' minute)`
doesn't yield duplicates. The difference is that query2 utilizes the pushdown
support but query1 doesn't. We suspect when query1 is executed, it examines
segments one by one and no locking is in place. For instance, it may first pull
out all the valid records from segment1, after which all the valid records'
locations are updated to segment2. Now when it comes to segment2, it again then
retrieves all the valid records over there. At the end, it returns a union of
the records from both segment1 and segment2, which contain duplicates.
CC: @mayankshriv @Jackie-Jiang @yupeng9 @elonazoulay
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]