[ 
https://issues.apache.org/jira/browse/DRILL-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487592#comment-16487592
 ] 

ASF GitHub Bot commented on DRILL-5977:
---------------------------------------

aravi5 commented on issue #1272: DRILL-5977: Filter Pushdown in Drill-Kafka 
plugin
URL: https://github.com/apache/drill/pull/1272#issuecomment-391413129
 
 
   @akumarb2010 - user has the flexibility to provide conditions w.r.t any of 
the metadata fields (`kafkaMsgOffset`, `kafkaMsgTimestamp`, 
`kafkaPartitionId`). The conditions on these metadata fields is pushed down and 
translated to a **Scan Spec** which is a collection of partition specific scan 
spec.
   
   Let us consider the following Scan spec before pushdown.
   
   ```
   KafkaPartitionScanSpec(topicName=TopicTable, partitionId=0, startOffset=0, 
endOffset=1500)
   KafkaPartitionScanSpec(topicName=TopicTable, partitionId=1, 
startOffset=2000, endOffset=3000)
   KafkaPartitionScanSpec(topicName=TopicTable, partitionId=2, startOffset=25, 
endOffset=3500)
   ```
   
   If the predicates are only on `kafkaMsgOffset` (for example `SELECT * FROM 
kafka.LogEventStream WHERE kafkaMsgOffset >= 1000 AND kafkaMsgOffset < 2000)`, 
this will apply the pushdown to ALL partitions within a topic. If there is a 
partition where such offsets do not exist (either the offsets have expired or 
messages for those offsets are yet to be produced), then such partition will 
not be scanned.
   
   The scan spec in this case would be
   ```
   KafkaPartitionScanSpec(topicName=TopicTable, partitionId=0, 
startOffset=1000, endOffset=1500)
   KafkaPartitionScanSpec(topicName=TopicTable, partitionId=1, 
startOffset=1200, endOffset=2000)
   KafkaPartitionScanSpec(topicName=TopicTable, partitionId=2, 
startOffset=1000, endOffset=2000)
   ```
   
   I am not sure if we should introduce per partition semantics in the query 
and would prefer keeping it generic. The *scenario you mentioned in (1)* can 
still be addressed with following predicates
   
   ```
   SELECT * FROM kafka.LogEventStream WHERE (kafkaPartitionId = 1 AND 
kafkaMsgOffset >= 1000 AND kafkaMsgOffset < 2000) OR (kafkaPartitionId = 2 AND 
kafkaMsgOffset >= 1500 AND kafkaMsgOffset < 5000)
   ```
   
   The scan spec in this case would be as follows (notice that there is no 
`partitionId 0`).
   ```
   KafkaPartitionScanSpec(topicName=TopicTable, partitionId=1, 
startOffset=1200, endOffset=2000)
   KafkaPartitionScanSpec(topicName=TopicTable, partitionId=2, 
startOffset=1000, endOffset=2000)
   ```
   
   This applies to conditions on `kafkaMsgTimestamp` as well. Most common use 
case is - user wants to view messages that belong to a specific time window 
(not very specific about the partition). This can be done by having predicates 
on `kafkaMsgTimestamp` alone without having to specify `kafkaPartitionId`.
   ```
   SELECT * FROM kafka.LogEventStream WHERE kafkaMsgTimestamp > 1527092007199 
AND kafkaMsgOffset < 1527092031717
   ```
   
   However user can also specify conditions specific to partition - *scenario 
mentioned in (2)*
   
   ```
   SELECT * FROM kafka.LogEventStream WHERE (kafkaPartitionId = 1 AND 
kafkaMsgTimestamp > 1527092007199 AND kafkaMsgOffset < 1527092031717) OR 
(kafkaPartitionId = 2 AND kafkaMsgTimestamp > 1527092133501)
   ```
   
   This would also avoid the problem of calling `offsetsForTimes` on 
non-existing partitions. Since such partitions are filtered out from the scan 
spec.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> predicate pushdown support kafkaMsgOffset
> -----------------------------------------
>
>                 Key: DRILL-5977
>                 URL: https://issues.apache.org/jira/browse/DRILL-5977
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: B Anil Kumar
>            Assignee: Abhishek Ravi
>            Priority: Major
>             Fix For: 1.14.0
>
>
> As part of Kafka storage plugin review, below is the suggestion from Paul.
> {noformat}
> Does it make sense to provide a way to select a range of messages: a starting 
> point or a count? Perhaps I want to run my query every five minutes, scanning 
> only those messages since the previous scan. Or, I want to limit my take to, 
> say, the next 1000 messages. Could we use a pseudo-column such as 
> "kafkaMsgOffset" for that purpose? Maybe
> SELECT * FROM <some topic> WHERE kafkaMsgOffset > 12345
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to