[ 
https://issues.apache.org/jira/browse/DRILL-7998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justin Chen updated DRILL-7998:
-------------------------------
    Description: 
My team and I have experienced two scenarios in which querying Kafka results in 
an incorrect result set. I'm unsure of whether they have the same root cause.

*Case 1:*
 Queries with a ORDER BY clause using kafkaMsgTimestamp return incorrect 
results. 

topic_1 is a topic with 2 partitions, no log compaction, in JSON format.

 
{code:java}
SELECT * FROM kafka.`topic_1` ORDER BY kafkaMsgTimestamp DESC LIMIT 10
{code}
Image attachment case1_1 shows that the latest kafkaMsgTimestamp was 
1630631881114 (Fri Sep 03 2021 01:18:01 GMT+0000). 

 

However, applying a pushdown filter using kafkaMsgTimestamp with timestamp 
1631160000000 (Thu Sep 09 2021 04:00:00 GMT+0000):

 
{code:java}
SELECT * FROM kafka.`topic_1` WHERE kafkaMsgTimestamp > 1631160000000 LIMIT 10
{code}
Image attachment case1_2 shows that there are many messages with more recent 
timestamps. Thus, ordering on kafkaMsgTimestamp seems to not correct correct 
results.

 

*Case 2:*

Queries are not returning correct results when using a WHERE clause unless an 
exact partition id and offset is provided.

topic_2 is a topic with > 200 partitions, using log compaction and in AVRO 
format.

 
{code:java}
SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND 
`topic_2`.after.shop_id = 2 LIMIT 1
{code}
 Image attachment case2_1 shows that no such record exists with id 1 and 
shop_id 2.

However, we manually confirmed the record exists using a consumer and found its 
kafkaPartitionId and kafkaMsgOffset. Adding an additional WHERE condition with 
kafkaPartitionId and kafkaMsgTimestamp to speed up the query:
{code:java}
SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND 
`topic_2`.after.shop_id = 2 AND kafkaPartitionId = 110 AND kafkaMsgTimestamp > 
1628196400000 LIMIT 1{code}
 Image attachment case2_2 shows that the record still cannot be found by Drill.

 

Finally, the exact kafkaMsgOffset was specified, along with kafkaPartitionId 
and kafkaMsgTimestamp:
{code:java}
SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND 
`topic_2`.after.shop_id = 2 AND kafkaPartitionId = 110 AND kafkaMsgOffset = 
85785074 AND kafkaMsgTimestamp > 1628196400000 LIMIT 1{code}
Image attachment case2_3 shows that Drill was only able to find the record when 
an exact partition and message offset was provided.

 

Is there any explanation for this behavior, or is this a bug? Thank you!

 

  was:
My team and I have experienced two scenarios in which querying Kafka results in 
an incorrect result set. I'm unsure of whether they have the same root cause.

*Case 1:*
Queries with a ORDER BY clause using kafkaMsgTimestamp return incorrect 
results. 

topic_1 is a topic with 2 partitions, no log compaction, in JSON format.

 
{code:java}
SELECT * FROM kafka.`topic_1` ORDER BY kafkaMsgTimestamp DESC LIMIT 10
{code}
Image attachment case1_1 shows that the latest kafkaMsgTimestamp was 
1630631881114 (Fri Sep 03 2021 01:18:01 GMT+0000). 

 

However, applying a pushdown filter using kafkaMsgTimestamp with timestamp 
1631160000000 (Thu Sep 09 2021 04:00:00 GMT+0000):

 
{code:java}
SELECT * FROM kafka.`topic_1` WHERE kafkaMsgTimestamp > 1631160000000 LIMIT 10
{code}
Image attachment case1_2 shows that there are many messages with more recent 
timestamps. Thus, ordering on kafkaMsgTimestamp seems to not correct correct 
results.

 

*Case 2:*

Queries are not returning correct results when using a WHERE clause unless an 
exact partition id and offset is provided.

topic_2 is a topic with > 200 partitions, using log compaction and in AVRO 
format.

 
{code:java}
SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND 
`topic_2`.after.shop_id = 2 LIMIT 1
{code}
 Image attachment case2_1 shows that no such record exists with id 1 and 
shop_id 2.

However, we manually confirmed the record exists using a consumer and found its 
kafkaPartitionId and kafkaMsgOffset. Adding an additional WHERE condition with 
kafkaPartitionId and kafkaMsgTimestamp to speed up the query:
{code:java}
SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND 
`topic_2`.after.shop_id = 2 AND kafkaPartitionId = 110 AND kafkaMsgTimestamp > 
1628196400000 LIMIT 1{code}
 Image attachment case2_2 shows that the record still cannot be found by Drill.

 

Finally, the exact kafkaMsgOffset was specified, along with kafkaPartitionId 
and kafkaMsgTimestamp:
{code:java}
SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND 
`topic_2`.after.shop_id = 2 AND kafkaPartitionId = 110 AND kafkaMsgOffset = 
85785074 AND kafkaMsgTimestamp > 1628196400000 LIMIT 1{code}
Image attachment case2_3 shows that Drill was only able to find the record when 
an exact partition and message offset was provided.

 

 


> Drill queries for Kafka storage plugin returning incorrect/missing result set
> -----------------------------------------------------------------------------
>
>                 Key: DRILL-7998
>                 URL: https://issues.apache.org/jira/browse/DRILL-7998
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Storage - Kafka
>    Affects Versions: 1.19.0
>            Reporter: Justin Chen
>            Priority: Major
>         Attachments: case1_1.png, case1_2.png, case2_1.png, case2_2.png, 
> case2_3.png
>
>
> My team and I have experienced two scenarios in which querying Kafka results 
> in an incorrect result set. I'm unsure of whether they have the same root 
> cause.
> *Case 1:*
>  Queries with a ORDER BY clause using kafkaMsgTimestamp return incorrect 
> results. 
> topic_1 is a topic with 2 partitions, no log compaction, in JSON format.
>  
> {code:java}
> SELECT * FROM kafka.`topic_1` ORDER BY kafkaMsgTimestamp DESC LIMIT 10
> {code}
> Image attachment case1_1 shows that the latest kafkaMsgTimestamp was 
> 1630631881114 (Fri Sep 03 2021 01:18:01 GMT+0000). 
>  
> However, applying a pushdown filter using kafkaMsgTimestamp with timestamp 
> 1631160000000 (Thu Sep 09 2021 04:00:00 GMT+0000):
>  
> {code:java}
> SELECT * FROM kafka.`topic_1` WHERE kafkaMsgTimestamp > 1631160000000 LIMIT 10
> {code}
> Image attachment case1_2 shows that there are many messages with more recent 
> timestamps. Thus, ordering on kafkaMsgTimestamp seems to not correct correct 
> results.
>  
> *Case 2:*
> Queries are not returning correct results when using a WHERE clause unless an 
> exact partition id and offset is provided.
> topic_2 is a topic with > 200 partitions, using log compaction and in AVRO 
> format.
>  
> {code:java}
> SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND 
> `topic_2`.after.shop_id = 2 LIMIT 1
> {code}
>  Image attachment case2_1 shows that no such record exists with id 1 and 
> shop_id 2.
> However, we manually confirmed the record exists using a consumer and found 
> its kafkaPartitionId and kafkaMsgOffset. Adding an additional WHERE condition 
> with kafkaPartitionId and kafkaMsgTimestamp to speed up the query:
> {code:java}
> SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND 
> `topic_2`.after.shop_id = 2 AND kafkaPartitionId = 110 AND kafkaMsgTimestamp 
> > 1628196400000 LIMIT 1{code}
>  Image attachment case2_2 shows that the record still cannot be found by 
> Drill.
>  
> Finally, the exact kafkaMsgOffset was specified, along with kafkaPartitionId 
> and kafkaMsgTimestamp:
> {code:java}
> SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND 
> `topic_2`.after.shop_id = 2 AND kafkaPartitionId = 110 AND kafkaMsgOffset = 
> 85785074 AND kafkaMsgTimestamp > 1628196400000 LIMIT 1{code}
> Image attachment case2_3 shows that Drill was only able to find the record 
> when an exact partition and message offset was provided.
>  
> Is there any explanation for this behavior, or is this a bug? Thank you!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to