[ 
https://issues.apache.org/jira/browse/HUDI-5707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17705663#comment-17705663
 ] 

kazdy edited comment on HUDI-5707 at 3/27/23 8:39 PM:
------------------------------------------------------

I think we can follow what kafka structured streaming source does.

1. hoodie.datasource.read.streaming.failOnDataLoss true || false (false) - when 
false, if start commit offset is not available in active timeline start reading 
from the earliest available offset (detecting earliest available offset is 
already there), probably good idea do put something to WARN log. When true, 
throw exception informing about potential data loss, that should stop streaming 
query execution.
2. hoodie.datasource.read.streaming.fallback.fulltablescan.enable: true || 
false (default) - when true and failOnDataLoss is false (default), fallback to 
fulltable scan when "start offset" commit is cleaned or archived. We don't want 
to read from the earliest commit in the table in this case, but rather treat 
cleaned/archived commits as if these were available in active timeline.
3. hoodie.datasource.read.streaming.maxInstantsPerTrigger: rate limit - how 
many instants/commits fetch per microbatch (this will also enable new 
AvailableNow trigger to work properly, atm it pulls all the changes same as 
"Trigger Once").

Might be good idea to introduce 
hoodie.datasource.read.streaming.maxBytesPerTrigger (best effort as we can't 
split one commit instant per multiple triggers) in the future.

I think I don't want to introduce auto.offset.reset for Hudi, even though it 
keeps some checkpoints internally for structured streaming (at least for batch 
id). I would like to stay as close to what Spark offers as possible. 
If user wants to reset checkpoints, then the procedure is:
 - remove spark checkpoint or change checkpoint location,
 - change hoodie.datasource.write.streaming.checkpoint.identifier to other 
value to reset internal hudi checkpoint,
 - set hoodie.datasource.streaming.startOffset to earliest/latest or specific 
instant whichever user needs.

(I would gladly change hoodie.datasource.streaming.startOffset to 
hoodie.datasource.read.streaming.startOffset to keep read config under 
read.streaming. prefix)

Regarding CDC stream read, 
hoodie.datasource.read.streaming.fallback.fulltablescan.enable will not work 
for CDC query, so it would be good to throw an exception so that when users 
want to do CDC read and fallback to fulltable scan hudi prevents them do do so. 
Or just disble this and log some warning.


was (Author: JIRAUSER284048):
I think we can follow what kafka structured streaming source does.

1. hoodie.datasource.read.streaming.failOnDataLoss true || false (false) - when 
false, if start commit offset is not available in active timeline start reading 
from the earliest available offset (detecting earliest available offset is 
already there), probably good idea do put something to WARN log. When true, 
throw exception informing about potential data loss, that should stop streaming 
query execution.
2. hoodie.datasource.read.streaming.fallback.fulltablescan.enable: true || 
false (default) - when true and failOnDataLoss is false (default), fallback to 
fulltable scan when "start offset" commit is cleaned or archived. We don't want 
to read from the earliest commit in the table in this case, but rather treat 
cleaned/archived commits as if these were available in active timeline.
3. hoodie.datasource.read.streaming.maxInstantsPerTrigger: rate limit - how 
many instants/commits fetch per microbatch (this will also enable new 
AvailableNow trigger to work properly, atm it pulls all the changes same as 
"Trigger Once").

Might be good idea to introduce 
hoodie.datasource.read.streaming.maxBytesPerTrigger (best effort as we can't 
split one commit instant per multiple triggers) in the future.

I think I don't want to introduce auto.offset.reset for Hudi, even though it 
keeps some checkpoints internally for structured streaming (at least for batch 
id). I would like to stay as close to what Spark offers as possible. 
If user wants to reset checkpoints, then the procedure is:
 - remove spark checkpoint or change checkpoint location,
 - change hoodie.datasource.write.streaming.checkpoint.identifier to other 
value to reset internal hudi checkpoint,
 - set hoodie.datasource.streaming.startOffset to earliest/latest or specific 
instant whichever user needs.

(I would gladly change hoodie.datasource.streaming.startOffset to 
hoodie.datasource.read.streaming.startOffset to keep read config under 
read.streaming. prefix)

Regarding CDC read, 
hoodie.datasource.read.streaming.fallback.fulltablescan.enable will not work 
for CDC query, so it would be good to throw an exception so that when users 
want to do CDC read and fallback to fulltable scan hudi prevents them do do so. 
Or just disble this and log some warning.

> Support offset reset strategy w/ spark streaming read from hudi table
> ---------------------------------------------------------------------
>
>                 Key: HUDI-5707
>                 URL: https://issues.apache.org/jira/browse/HUDI-5707
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: reader-core
>            Reporter: sivabalan narayanan
>            Assignee: kazdy
>            Priority: Major
>             Fix For: 1.0.0
>
>
> For users reading hudi table in a streaming manner, we need to support offset 
> reset strategy if the commit of interest it archived or cleaned up. 
>  
> notes from the issue 
> In streaming read, user might want to get all incremental changes. from what 
> I see, this is nothing but an incremental query on a hudi table. w/ 
> incremental query, we do have fallback mechanism via 
> {{{}hoodie.datasource.read.incr.fallback.fulltablescan.enable{}}}.
> But in streaming read, the amount of data read might spike up(if we plan to 
> do the same) and the user may not have provisioned higher resources for the 
> job.
> I am thinking, if we should add something like {{auto.offset.reset}} we have 
> in kafka. If you know if we have something similar in streaming read from 
> spark itself, we can leverage the same or add a new config in hoodie.
> So, users can configure what they want to do in such cases:
>  # whether they wish to resume reading from earliest valid commit from hudi.
> // impl might be involved. since we need to dedect the commit which hasn't 
> been cleaned by the cleaner yet.
>  # Or do snapshot query w/ latest table state.
>  # Fail the streaming read.
>  #  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to