[ 
https://issues.apache.org/jira/browse/DRILL-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488353#comment-16488353
 ] 

ASF GitHub Bot commented on DRILL-5977:
---------------------------------------

aravi5 commented on issue #1272: DRILL-5977: Filter Pushdown in Drill-Kafka 
plugin
URL: https://github.com/apache/drill/pull/1272#issuecomment-391573236
 
 
   Thanks @akumarb2010 for your comments. I have a few questions regarding your 
comments.
   
   >In Kafka, offset scope itself is per partition. I am unable to find any use 
case, where we can take the range of offsets and apply on all partitions. In 
most of the scenario's they may not be valid offsets.
   >IMHO, we should only apply predicate pushdown where we have exact scan 
specs.
   
   1. Take this query for example
   ```
   SELECT * FROM kafka.LogEventStream WHERE kafkaMsgOffset >= 1000 AND 
kafkaMsgOffset < 2000)
   ```
   
   Did you mean that we do not apply predicate pushdown for such conditions? If 
we do not pushdown, all partitions will be scanned from `startOffset` to 
`endOffset`. Instead, by supporting pushdown for such queries, we are limiting 
the scan range wherever possible. Are there any drawbacks of applying pushdown 
on such conditions?
   
   Also, pushdown implementation handles cases where specified offsets are 
invalid.
   
   2. Can you elaborate more on this?
   > we can use this predicate pushdown feature for external checkpointing 
mechanism.
   
   3. 
   >And coming to time stamps, my point is, in case of invalid partitionId, 
query might block indefinitely with this feature. Where as, without this 
feature, we will return empty results.
   
   Even with pushdown we will return empty results. Pushdown is applied to each 
of the predicates independently and merged. The implementation is such that it 
ensures `offsetsForTimes` is called only for valid partitions (i.e., partitions 
returned by `partitionsFor`). Hence we will not run into the situation where 
`offsetsForTimes` blocks infinitely.
   
   I will add test case for this situation. (I have already added cases for 
predicates with invalid offsets and timestamps.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> predicate pushdown support kafkaMsgOffset
> -----------------------------------------
>
>                 Key: DRILL-5977
>                 URL: https://issues.apache.org/jira/browse/DRILL-5977
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: B Anil Kumar
>            Assignee: Abhishek Ravi
>            Priority: Major
>             Fix For: 1.14.0
>
>
> As part of Kafka storage plugin review, below is the suggestion from Paul.
> {noformat}
> Does it make sense to provide a way to select a range of messages: a starting 
> point or a count? Perhaps I want to run my query every five minutes, scanning 
> only those messages since the previous scan. Or, I want to limit my take to, 
> say, the next 1000 messages. Could we use a pseudo-column such as 
> "kafkaMsgOffset" for that purpose? Maybe
> SELECT * FROM <some topic> WHERE kafkaMsgOffset > 12345
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to