[
https://issues.apache.org/jira/browse/HUDI-5707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17705663#comment-17705663
]
kazdy edited comment on HUDI-5707 at 3/27/23 8:39 PM:
------------------------------------------------------
I think we can follow what kafka structured streaming source does.
1. hoodie.datasource.read.streaming.failOnDataLoss true || false (false) - when
false, if start commit offset is not available in active timeline start reading
from the earliest available offset (detecting earliest available offset is
already there), probably good idea do put something to WARN log. When true,
throw exception informing about potential data loss, that should stop streaming
query execution.
2. hoodie.datasource.read.streaming.fallback.fulltablescan.enable: true ||
false (default) - when true and failOnDataLoss is false (default), fallback to
fulltable scan when "start offset" commit is cleaned or archived. We don't want
to read from the earliest commit in the table in this case, but rather treat
cleaned/archived commits as if these were available in active timeline.
3. hoodie.datasource.read.streaming.maxInstantsPerTrigger: rate limit - how
many instants/commits fetch per microbatch (this will also enable new
AvailableNow trigger to work properly, atm it pulls all the changes same as
"Trigger Once").
Might be good idea to introduce
hoodie.datasource.read.streaming.maxBytesPerTrigger (best effort as we can't
split one commit instant per multiple triggers) in the future.
I think I don't want to introduce auto.offset.reset for Hudi, even though it
keeps some checkpoints internally for structured streaming (at least for batch
id). I would like to stay as close to what Spark offers as possible.
If user wants to reset checkpoints, then the procedure is:
- remove spark checkpoint or change checkpoint location,
- change hoodie.datasource.write.streaming.checkpoint.identifier to other
value to reset internal hudi checkpoint,
- set hoodie.datasource.streaming.startOffset to earliest/latest or specific
instant whichever user needs.
(I would gladly change hoodie.datasource.streaming.startOffset to
hoodie.datasource.read.streaming.startOffset to keep read config under
read.streaming. prefix)
Regarding CDC stream read,
hoodie.datasource.read.streaming.fallback.fulltablescan.enable will not work
for CDC query, so it would be good to throw an exception so that when users
want to do CDC read and fallback to fulltable scan hudi prevents them do do so.
Or just disble this and log some warning.
was (Author: JIRAUSER284048):
I think we can follow what kafka structured streaming source does.
1. hoodie.datasource.read.streaming.failOnDataLoss true || false (false) - when
false, if start commit offset is not available in active timeline start reading
from the earliest available offset (detecting earliest available offset is
already there), probably good idea do put something to WARN log. When true,
throw exception informing about potential data loss, that should stop streaming
query execution.
2. hoodie.datasource.read.streaming.fallback.fulltablescan.enable: true ||
false (default) - when true and failOnDataLoss is false (default), fallback to
fulltable scan when "start offset" commit is cleaned or archived. We don't want
to read from the earliest commit in the table in this case, but rather treat
cleaned/archived commits as if these were available in active timeline.
3. hoodie.datasource.read.streaming.maxInstantsPerTrigger: rate limit - how
many instants/commits fetch per microbatch (this will also enable new
AvailableNow trigger to work properly, atm it pulls all the changes same as
"Trigger Once").
Might be good idea to introduce
hoodie.datasource.read.streaming.maxBytesPerTrigger (best effort as we can't
split one commit instant per multiple triggers) in the future.
I think I don't want to introduce auto.offset.reset for Hudi, even though it
keeps some checkpoints internally for structured streaming (at least for batch
id). I would like to stay as close to what Spark offers as possible.
If user wants to reset checkpoints, then the procedure is:
- remove spark checkpoint or change checkpoint location,
- change hoodie.datasource.write.streaming.checkpoint.identifier to other
value to reset internal hudi checkpoint,
- set hoodie.datasource.streaming.startOffset to earliest/latest or specific
instant whichever user needs.
(I would gladly change hoodie.datasource.streaming.startOffset to
hoodie.datasource.read.streaming.startOffset to keep read config under
read.streaming. prefix)
Regarding CDC read,
hoodie.datasource.read.streaming.fallback.fulltablescan.enable will not work
for CDC query, so it would be good to throw an exception so that when users
want to do CDC read and fallback to fulltable scan hudi prevents them do do so.
Or just disble this and log some warning.
> Support offset reset strategy w/ spark streaming read from hudi table
> ---------------------------------------------------------------------
>
> Key: HUDI-5707
> URL: https://issues.apache.org/jira/browse/HUDI-5707
> Project: Apache Hudi
> Issue Type: Improvement
> Components: reader-core
> Reporter: sivabalan narayanan
> Assignee: kazdy
> Priority: Major
> Fix For: 1.0.0
>
>
> For users reading hudi table in a streaming manner, we need to support offset
> reset strategy if the commit of interest it archived or cleaned up.
>
> notes from the issue
> In streaming read, user might want to get all incremental changes. from what
> I see, this is nothing but an incremental query on a hudi table. w/
> incremental query, we do have fallback mechanism via
> {{{}hoodie.datasource.read.incr.fallback.fulltablescan.enable{}}}.
> But in streaming read, the amount of data read might spike up(if we plan to
> do the same) and the user may not have provisioned higher resources for the
> job.
> I am thinking, if we should add something like {{auto.offset.reset}} we have
> in kafka. If you know if we have something similar in streaming read from
> spark itself, we can leverage the same or add a new config in hoodie.
> So, users can configure what they want to do in such cases:
> # whether they wish to resume reading from earliest valid commit from hudi.
> // impl might be involved. since we need to dedect the commit which hasn't
> been cleaned by the cleaner yet.
> # Or do snapshot query w/ latest table state.
> # Fail the streaming read.
> #
--
This message was sent by Atlassian Jira
(v8.20.10#820010)