mapshen opened a new issue #7849:
URL: https://github.com/apache/pinot/issues/7849


   We recently found that querying Pinot `upsert` enabled tables can return 
different number of rows at different times. The row count returned can be 
higher than, equal to, or lower than the true count, whereas we expect it to be 
always same as the true count.
   
   ## Setup
   
   ### Versions
   Pinot Version: 0.8.0
   Trino Version: 362
   
   ### Kafka Topic
   Topic `topic1` is single partitioned, and the publisher sends over 1000 
messages per second.
   
   ### TableConfig
   The table has about 200 columns, 100 metric fields and 100 dimension fields. 
Column `A` serves as the primary key with ~20,000 unique values. Segment flush 
threshold is set to 10,000, which means a new segment is produced every 10 
seconds.
   
   ```
   {
      "tableName":"table1",
      "schema":{
         "metricFieldSpecs":[
            {
               "name":"B",
               "dataType":"DOUBLE"
            }
         ],
         "dimensionFieldSpecs":[
            {
               "name":"A",
               "dataType":"STRING"
            }
         ],
         "dateTimeFieldSpecs":[
            {
               "name":"EPOCH",
               "dataType":"INT",
               "format":"1:SECONDS:EPOCH",
               "granularity":"1:SECONDS"
            }
         ],
         "primaryKeyColumns":[
            "A"
         ],
         "schemaName":"schema1"
      },
      "realtime":{
         "tableName":"table1",
         "tableType":"REALTIME",
         "segmentsConfig":{
            "schemaName":"schema1",
            "timeColumnName":"EPOCH",
            "replicasPerPartition":"1",
            "retentionTimeUnit":"DAYS",
            "retentionTimeValue":"4",
            "segmentPushType":"APPEND",
            "completionConfig":{
               "completionMode":"DOWNLOAD"
            }
         },
         "tableIndexConfig":{
            "invertedIndexColumns":[
               "A"
            ],
            "loadMode":"MMAP",
            "nullHandlingEnabled":false,
            "streamConfigs":{
               "realtime.segment.flush.threshold.rows":"10000",
               "realtime.segment.flush.threshold.time":"96h",
               "streamType":"kafka",
               "stream.kafka.consumer.type":"lowLevel",
               "stream.kafka.topic.name":"topic1",
               
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
               
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
               "stream.kafka.broker.list":"kafka:9092",
               "stream.kafka.consumer.prop.auto.offset.reset":"largest"
            }
         },
         "tenants":{
            
         },
         "metadata":{
            
         },
         "routing":{
            "instanceSelectorType":"strictReplicaGroup"
         },
         "upsertConfig":{
            "mode":"FULL"
         }
      }
   }
   ```
   
   ## Issues
   ### Case 1:  one or two rows are missing
   
   #### Steps to reproduce
   1. Bump up the segment flush threshold to a large enough enough number like 
1,000,000 to make sure there is no flush.
   2. Run `select count(*) from table1` in PQL repeatedly and you will 
periodically see numbers like 19,998 or 19,997 while we expect 20,000.
   
   #### Root cause 
   The process of update an `upsert` table segment is not atomic, using two 
steps `remove` and `add` instead. #7844 by @Jackie-Jiang addresses it.
   
   ### Case 2: hundreds or thousands of rows are missing
   
   #### Steps to reproduce
   1. Set the segment flush threshold to a small enough enough number like 
1,000 so that segment flushes happen frequently.
   2. Run `select count(*) from table1` in PQL repeatedly and you will see 
numbers like 16,703 or 18,234 while we expect 20,000 when a segment is flushed.
   
   #### Root cause
   Unclear - upsert across segments not atomic?
   
   ### Case 3 - duplicates are returned
   1. Continue with the setup in Case 2.
   2. Run query1 `select count(*) from table1 where from_unixtime(EPOCH) > 
current_timestamp - interval '15' minute` via Trino repeatedly and you will see 
numbers like 20023 when you expect to see 18,000 - over 2,000 duplicates 
returned.
   
   #### Root cause
   We happened to notice that an equivalent Trino query2 `select count(*) from 
table1 where EPOCH > to_unixtime(current_timestamp - interval '15' minute)` 
doesn't yield duplicates. The difference is that query2 utilizes the pushdown 
support but query1 doesn't. We suspect when query1 is executed, it examines 
segments one by one and no locking is in place. For instance, it may first pull 
out all the valid records from segment1, after which all the valid records' 
locations are updated to segment2. Now when it comes to segment2, it again then 
retrieves all the valid records over there. At the end, it returns a union of 
the records from both segment1 and segment2, which contain duplicates. 
   
   CC: @mayankshriv @Jackie-Jiang @yupeng9 @elonazoulay 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to