Hi,


Sorry, it took some time to respond back.







“but I thought we would pass the config through to the client.”

>> @John, sure we can use the config in GloablStreamThread, that could be one 
>> of the way to solve it.






@Matthias, sure cleaning the store and recreating is one way but since we are 
giving an option to reset in StreamThread why the implementation should be 
different in GlobalStreamThread. I think we should use the 
global.consumer.auto.offset.reset config to accept the reset strategy opted by 
the user although I would be ok with just cleaning and resetting to the latest 
as well for now. Currently, we throw a StreamsException in case of 
InvalidOffsetException in GlobalStreamThread so just resetting would still be 
better than what happens currently. 

Matthias, I found this comment in StreamBuilder for GlobalKTable ‘* Note that 
{@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code 
"earliest"} regardless of the specified value in {@link StreamsConfig} or 
{@link Consumed}.’ 
So, I guess we are already cleaning up and recreating for GlobalKTable from 
earliest offset.








@Guozhan while looking at the code, I also noticed a TODO: pending in 
GlobalStateManagerImpl, when InvalidOffsetException is thrown. Earlier, we were 
directly clearing the store here and recreating from scratch but that code 
piece is removed now. Are you working on a follow-up PR for this or just 
handling the reset in GlobalStreamThread should be sufficient?

Regards,
Navinder

    On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax 
<mj...@apache.org> wrote:  
 
 Atm, the config should be ignored and the global-consumer should use
"none" in a hard-coded way.

However, if am still wondering if we actually want/need to allow users
to specify the reset policy? It might be worth to consider, to just
change the behavior: catch the exception, log an ERROR (for information
purpose), wipe the store, seekToBeginning(), and recreate the store?

Btw: if we want to allow users to set the reset policy, this should be
possible via the config, or via overwriting the config in the method
itself. Thus, we would need to add the new overloaded method to
`Topology` and `StreamsBuilder`.

Another question to ask: what about GlobalKTables? Should they behave
the same? An alternative design could be, to allow users to specify a
flexible reset policy for global-stores, but not for GlobalKTables and
use the strategy suggested above for this case.

Thoughts?


-Matthias


On 7/2/20 2:14 PM, John Roesler wrote:
> Hi Navinder,
> 
> Thanks for the response. I’m sorry if I’m being dense... You said we are not 
> currently using the config, but I thought we would pass the config through to 
> the client.  Can you confirm whether or not the existing config works for 
> your use case?
> 
> Thanks,
> John
> 
> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote:
>> Sorry my bad. Found it.
>>
>>
>>
>> Prefix used to override {@link KafkaConsumer consumer} configs for the 
>> global consumer client from
>>
>> * the general consumer client configs. The override precedence is the 
>> following (from highest to lowest precedence):
>> * 1. global.consumer.[config-name]..
>> public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
>>
>>
>>
>> So, that's great. We already have a config exposed to reset offsets for 
>> global topics via global.consumer.auto.offset.reset just that we are 
>> not actually using it inside GlobalStreamThread to reset.
>>
>> -Navinder
>>    On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar 
>> <navinder_b...@yahoo.com.invalid> wrote:  
>>  
>>  Hi John,
>>
>> Thanks for your feedback. 
>> 1. I think there is some confusion on my first point, the enum I am 
>> sure we can use the same one but the external config which controls the 
>> resetting in global stream thread either we can the same one which 
>> users use for source topics(StreamThread) or we can provide a new one 
>> which specifically controls global topics. For e.g. currently if I get 
>> an InvalidOffsetException in any of my source topics, I can choose 
>> whether to reset from Earliest or Latest(with auto.offset.reset). Now 
>> either we can use the same option and say if I get the same exception 
>> for global topics I will follow same resetting. Or some users might 
>> want to have totally different setting for both source and global 
>> topics, like for source topic I want resetting from Latest but for 
>> global topics I want resetting from Earliest so in that case adding a 
>> new config might be better.
>>
>> 2. I couldn't find this config currently 
>> "global.consumer.auto.offset.reset". Infact in GlobalStreamThread.java 
>> we are throwing a StreamsException for InvalidOffsetException and there 
>> is a test as 
>> well GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I 
>> think this is the config we are trying to introduce with this KIP.
>>
>> -Navinder  On Saturday, 27 June, 2020, 07:03:04 pm IST, John Roesler 
>> <j...@vvcephei.org> wrote:  
>>  
>>  Hi Navinder,
>>
>> Thanks for this proposal!
>>
>> Regarding your question about whether to use the same policy
>> enum or not, the underlying mechanism is the same, so I think
>> we can just use the same AutoOffsetReset enum.
>>
>> Can you confirm whether setting the reset policy config on the
>> global consumer currently works or not? Based on my reading
>> of StreamsConfig, it looks like it would be:
>> "global.consumer.auto.offset.reset".
>>
>> If that does work, would you still propose to augment the
>> Java API?
>>
>> Thanks,
>> -John
>>
>> On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote:
>>> Hi,
>>>
>>> KIP: 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
>>>
>>> I have taken over this KIP since it has been dormant for a long time 
>>> and this looks important for use-cases that have large global data, so 
>>> rebuilding global stores from scratch might seem overkill in case of 
>>> InvalidOffsetExecption.
>>>
>>> We want to give users the control to use reset policy(as we do in 
>>> StreamThread) in case they hit invalid offsets. I have still not 
>>> decided whether to restrict this option to the same reset policy being 
>>> used by StreamThread(using auto.offset.reset config) or add another 
>>> reset config specifically for global stores 
>>> "global.auto.offset.reset" which gives users more control to choose 
>>> separate policies for global and stream threads.
>>>
>>> I would like to hear your opinions on the KIP.
>>>
>>>
>>> -Navinder
  

Reply via email to