1. We don't need to worry about impl detail. But yes, we can remove the
method from the interanl context that extends `ProcessorContext` already

2. Same here: we can discuss on the PR.


Btw: it seems you got enough votes. Can you close the vote? Looking
forward to your PR.


-Matthias

On 11/27/20 9:51 PM, Rohit Deshpande wrote:
> Hi,
> I would like to revive this KIP.
> 1. As per proposed solution, we want to add following method in 
> ProcessorContext class
> /**
>  * Returns current cached wall-clock system timestamp in milliseconds.
>  *
>  * @return the current cached wall-clock system timestamp in milliseconds
>  */
> long currentSystemTimeMs();
> but InternalProcessorContext class already contains same method: 
> https://github.com/guozhangwang/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java#L54
> Will it make more sense to get rid of this method from 
> InternalProcessorContext and add it to ProcessorContext?
> 2. I am thinking of adding one test in TopologyDriverTest where using 
> currentSystemTimeMs(), Processor will determine what to do with incoming 
> record by comparing its timestamp with wall clock time. Similarly we can have
> another test where we fetch streamTime and can take an action on incoming 
> record.
> 
> 
> Thanks,
> Rohit
> 
> 
> On 2020/08/14 05:07:04, "John Roesler" <v...@apache.org> wrote: 
>> Thanks for the reply, Matthias,> 
>>
>> I see what you mean. I suppose I was thinking that we would pass in the 
>> cached system time, which is also what we’re proposing to add to the 
>> ProcessorContext.> 
>>
>> If you think there’s something about the timestamp extractor in particular 
>> that would make people want more precision, then something like Time would 
>> do the trick. Since it’s not a public API, maybe just ‘Supplier<Long>’ would 
>> be appropriate.> 
>>
>> But I also don’t want to bikeshed it. My only concern was that it’s awkward 
>> to ask people to actually change their application code for testing. But 
>> maybe in this case, an option is better than no option, and if people don’t 
>> like it, we can always deprecate the mock extractor and evolve the interface 
>> later. > 
>>
>> So, I’m +1 either way.> 
>>
>> Thanks,> 
>> John> 
>>
>> On Mon, Aug 3, 2020, at 16:28, Matthias J. Sax wrote:> 
>>> Interesting proposal.> 
>>>>
>>> However, it raises the question how the runtime would pass in the> 
>>> `systemTime` parameter? To be accurate, we would need to call> 
>>> `Time.milliseconds()` each time before we call the timestamp extractor.> 
>>> This sound expensive and maybe the extractor does not even use this value.> 
>>>>
>>> Or we only call `Time.milliseconds()` periodically (as we also do in our> 
>>> runtime code) to make it cheap, however, we loose precision? Not sure if> 
>>> we can make this trade-off for the user?> 
>>>>
>>> Handing in the `Time` object itself might be another idea, however it> 
>>> seems "dangerous" though, as it does not seem to be actually public API?> 
>>>>
>>> Last, do we really think we need this feature? We never had a feature> 
>>> request for it and I am not aware of any issue with the current> 
>>> TimestampExtractor interface.> 
>>>>
>>> It's always easier to add it later if there is real demand instead of> 
>>> pro-actively changing it (and maybe the need to deprecate and remove> 
>>> later) with no clear benefit? Adding the `MockTimestampsExtractor` as> 
>>> part of the test-utils package seems less "dangerous" and should do the> 
>>> job, allowing us to collect feedback. If it's not good enough, we can> 
>>> still change the TimestampExtractor interface as a follow up?> 
>>>>
>>>>
>>> -Matthias> 
>>>>
>>> On 7/28/20 10:03 AM, John Roesler wrote:> 
>>>> Thanks Matthias,> 
>>>>>
>>>> This is a really good point. It might be a bummer> 
>>>> to have to actually change the topology between> 
>>>> testing and production. Do you think we can rather> 
>>>> evolve the TimestampExtractor interface to let> 
>>>> Streams pass in the current system time, along with> 
>>>> the current record and the current partition time?> 
>>>>>
>>>> For example, we could add a new method:> 
>>>> long extract(> 
>>>>   ConsumerRecord<Object, Object> record, > 
>>>>   long partitionTime,> 
>>>>   long systemTime> 
>>>> );> 
>>>>>
>>>> Then, Streams could pass in the current system > 
>>>> time and TopologyTestDriver could pass the mocked> 
>>>> time. Additionally, users who implement> 
>>>> TimestampExtractor on their own would be able to> 
>>>> deterministically unit-test their own implementation.> 
>>>>>
>>>> It's always a challenge to add to an interface without> 
>>>> breaking compatibility. In this case, it seems like> 
>>>> we could provide a default implementation that just> 
>>>> ignores the systemTime argument and calls> 
>>>> extract(record,  partitionTime) and also deprecate> 
>>>> the existing method. Then custom implementations> 
>>>> would get a deprecation warning telling them to> 
>>>> implement the other method, and when we remove> 
>>>> the deprecated extract(record, partitionTime), we can> 
>>>> also drop the default implementation from the new> 
>>>> method.> 
>>>>>
>>>> Specifically, what do you think about:> 
>>>> =================================> 
>>>> public interface TimestampExtractor {> 
>>>>     /*...> 
>>>>      * @deprecated Since 2.7 Implement> 
>>>>      *   {@code extract(ConsumerRecord<Object, Object> record, long 
>>>> partitionTime, long systemTime)} instead> 
>>>>      */> 
>>>>     @Deprecated> 
>>>>     long extract(> 
>>>>       ConsumerRecord<Object, Object> record,> 
>>>>       long partitionTime> 
>>>>     );> 
>>>>>
>>>>     default long extract(> 
>>>>       ConsumerRecord<Object, Object> record,> 
>>>>       long partitionTime,> 
>>>>       long systemTime) {> 
>>>>         return extract(record, partitionTime);> 
>>>>     }> 
>>>> }> 
>>>> =================================> 
>>>>>
>>>> Thanks,> 
>>>> -John> 
>>>>>
>>>> On Sun, Jul 26, 2020, at 15:47, Matthias J. Sax wrote:> 
>>>>> Hi,> 
>>>>>>
>>>>> I just had one more thought about an additional improvement we might> 
>>>>> want to include in this KIP.> 
>>>>>>
>>>>> Kafka Streams ships with a `WallclockTimestampExtractor` that just> 
>>>>> returns `System.currentTimeMillis()` and thus, cannot be mocked. And it> 
>>>>> seems that there is no way for a user to build a custom timestamps> 
>>>>> extractor that returns the TTD's mocked system time.> 
>>>>>>
>>>>> Thus, it might be nice, to add a `MockTimeExtractor` (only in the> 
>>>>> test-util package) that users could set and this `MockTimeExtractor`> 
>>>>> returns the mocked system time.> 
>>>>>>
>>>>> Thoughts?> 
>>>>>>
>>>>>>
>>>>> -Matthias> 
>>>>>>
>>>>> On 7/7/20 11:11 PM, Matthias J. Sax wrote:> 
>>>>>> I think, we don't need a default implementation for the new methods.> 
>>>>>>>
>>>>>> What would be the use-case to implement the  `ProcessorContext`> 
>>>>>> interface? In contract to, for example, `KeyValueStore`,> 
>>>>>> `ProcessorContext` is a use-only interface because it's never passed> 
>>>>>> into Kafka Streams, but only handed out to the user.> 
>>>>>>>
>>>>>>>
>>>>>> -Matthias> 
>>>>>>>
>>>>>>>
>>>>>> On 7/7/20 1:28 PM, William Bottrell wrote:> 
>>>>>>> Sure, I would appreciate help from Piotr creating an example.> 
>>>>>>>>
>>>>>>> On Tue, Jul 7, 2020 at 12:03 PM Boyang Chen <re...@gmail.com>> 
>>>>>>> wrote:> 
>>>>>>>>
>>>>>>>> Hey John,> 
>>>>>>>>>
>>>>>>>> since ProcessorContext is a public API, I couldn't be sure that 
>>>>>>>> people> 
>>>>>>>> won't try to extend it. Without a default implementation, user code> 
>>>>>>>> compilation will break.> 
>>>>>>>>>
>>>>>>>> William and Piotr, it seems that we haven't added any example usage of 
>>>>>>>> the> 
>>>>>>>> new API, could we try to address that? It should help with the 
>>>>>>>> motivation> 
>>>>>>>> and follow-up meta comments as John proposed.> 
>>>>>>>>>
>>>>>>>> Boyang> 
>>>>>>>>>
>>>>>>>> On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> 
>>>>>>>> wrote:> 
>>>>>>>>>
>>>>>>>>> William,> 
>>>>>>>>>>
>>>>>>>>> thanks for the KIP. LGMT. Feel free to start a vote.> 
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> -Matthias> 
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> On 7/4/20 10:14 AM, John Roesler wrote:> 
>>>>>>>>>> Hi Richard,> 
>>>>>>>>>>>
>>>>>>>>>> It’s good to hear from you!> 
>>>>>>>>>>>
>>>>>>>>>> Thanks for bringing up the wall-clock suppression feature. IIRC,> 
>>>>>>>> someone> 
>>>>>>>>> actually started a KIP discussion for it already, but I don’t think 
>>>>>>>>> it> 
>>>>>>>> went> 
>>>>>>>>> to a vote. I don’t recall any technical impediment, just the lack of> 
>>>>>>>>> availability to finish it up. Although there is some association, it> 
>>>>>>>> would> 
>>>>>>>>> be good to keep the KIPs separate.> 
>>>>>>>>>>>
>>>>>>>>>> Thanks,> 
>>>>>>>>>> John> 
>>>>>>>>>>>
>>>>>>>>>> On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:> 
>>>>>>>>>>> Hi all,> 
>>>>>>>>>>>>
>>>>>>>>>>> This reminds me of a previous issue I think that we were 
>>>>>>>>>>> discussing.> 
>>>>>>>>>>> @John Roesler <ma...@apache.org> I think you should> 
>>>>>>>> remember> 
>>>>>>>>> this one.> 
>>>>>>>>>>>>
>>>>>>>>>>> A while back, we were talking about having suppress operator emit> 
>>>>>>>>>>> records by wall-clock time instead of stream time.> 
>>>>>>>>>>> If we are adding this, wouldn't that make it more feasible for us 
>>>>>>>>>>> to> 
>>>>>>>>>>> implement that feature for suppression?> 
>>>>>>>>>>>>
>>>>>>>>>>> If I recall correctly, there actually had been quite a bit of user> 
>>>>>>>>>>> demand for such a feature.> 
>>>>>>>>>>> Might be good to include it in this KIP (or maybe use one of the 
>>>>>>>>>>> prior> 
>>>>>>>>>>> KIPs for this feature).> 
>>>>>>>>>>>>
>>>>>>>>>>> Best,> 
>>>>>>>>>>> Richard> 
>>>>>>>>>>>>
>>>>>>>>>>> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>> 
>>>>>>>>> wrote:> 
>>>>>>>>>>>> Hi all,> 
>>>>>>>>>>>>>
>>>>>>>>>>>>  1. Thanks, Boyang, it is nice to see usage examples in KIPs like> 
>>>>>>>>> this. It helps during the discussion, and it’s also good 
>>>>>>>>> documentation> 
>>>>>>>>> later on.> 
>>>>>>>>>>>>>
>>>>>>>>>>>>  2. Yeah, this is a subtle point. The motivation mentions being 
>>>>>>>>>>>> able> 
>>>>>>>>> to control the time during tests, but to be able to make it work, 
>>>>>>>>> the> 
>>>>>>>>> processor implementation needs a public method on ProcessorContext to 
>>>>>>>>> get> 
>>>>>>>>> the time. Otherwise, processors would have to check the type of the> 
>>>>>>>> context> 
>>>>>>>>> and cast, depending on whether they’re running inside a test or not. 
>>>>>>>>> In> 
>>>>>>>>> retrospect, if we’d had a usage example, this probably would have 
>>>>>>>>> been> 
>>>>>>>>> clear.> 
>>>>>>>>>>>>>
>>>>>>>>>>>>  3. I don’t think we expect people to have their own 
>>>>>>>>>>>> implementations> 
>>>>>>>>> of ProcessorContext. Since all implementations are internal, it’s 
>>>>>>>>> really> 
>>>>>>>> an> 
>>>>>>>>> implementation detail whether we use a default method, abstract 
>>>>>>>>> methods,> 
>>>>>>>> or> 
>>>>>>>>> concrete methods. I can’t think of an implementation that really 
>>>>>>>>> wants to> 
>>>>>>>>> just look up the system time. In the production code path, we cache 
>>>>>>>>> the> 
>>>>>>>>> time for performance, and in testing, we use a mock time.> 
>>>>>>>>>>>>>
>>>>>>>>>>>>  Thanks,> 
>>>>>>>>>>>>  John> 
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:> 
>>>>>>>>>>>>  > 1. Makes sense; let me propose something> 
>>>>>>>>>>>>  >> 
>>>>>>>>>>>>  > 2. That's not testing-only. The goal is to use the same API to> 
>>>>>>>>> access> 
>>>>>>>>>>>>  > the time> 
>>>>>>>>>>>>  > in deployment and testing environments. The major driver is> 
>>>>>>>>>>>>  > System.currentTimeMillis(),> 
>>>>>>>>>>>>  > which a) cannot be used in tests b) could go in specific cases> 
>>>>>>>> back> 
>>>>>>>>> c)> 
>>>>>>>>>>>>  > is not compatible> 
>>>>>>>>>>>>  > with punctuator call. The idea is that we could access clock 
>>>>>>>>>>>> using> 
>>>>>>>>>>>>  > uniform API.> 
>>>>>>>>>>>>  > For completness we should have same API for system and stream> 
>>>>>>>> time.> 
>>>>>>>>>>>>  >> 
>>>>>>>>>>>>  > 3. There aren't that many subclasses. Two important ones are> 
>>>>>>>>>>>>  > ProcessorContextImpl and> 
>>>>>>>>>>>>  > MockProcessorContext (and third one:> 
>>>>>>>>>>>>  > ForwardingDisableProcessorContext). If given> 
>>>>>>>>>>>>  > implementation does not support schedule() call, there is no> 
>>>>>>>> reason> 
>>>>>>>>> to> 
>>>>>>>>>>>>  > support clock access.> 
>>>>>>>>>>>>  > The default implementation should just throw> 
>>>>>>>>>>>>  > UnsupportedOperationException just to prevent> 
>>>>>>>>>>>>  > from compilation errors in possible subclasses.> 
>>>>>>>>>>>>  >> 
>>>>>>>>>>>>  > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>> 
>>>>>>>>> wrote:> 
>>>>>>>>>>>>  > > Thanks Will for the KIP. A couple questions and suggestions:> 
>>>>>>>>>>>>  > >> 
>>>>>>>>>>>>  > > 1. I think for new APIs to make most sense, we should add a> 
>>>>>>>>> minimal example> 
>>>>>>>>>>>>  > > demonstrating how it could be useful to structure unit tests 
>>>>>>>>>>>> w/o> 
>>>>>>>>> the new> 
>>>>>>>>>>>>  > > APIs.> 
>>>>>>>>>>>>  > > 2. If this is a testing-only feature, could we only add it> 
>>>>>>>>>>>>  > > to MockProcessorContext?> 
>>>>>>>>>>>>  > > 3. Regarding the API, since this will be added to the> 
>>>>>>>>> ProcessorContext with> 
>>>>>>>>>>>>  > > many subclasses, does it make sense to provide default> 
>>>>>>>>> implementations as> 
>>>>>>>>>>>>  > > well?> 
>>>>>>>>>>>>  > >> 
>>>>>>>>>>>>  > > Boyang> 
>>>>>>>>>>>>  > >> 
>>>>>>>>>>>>  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <> 
>>>>>>>>> bottre...@gmail.com>> 
>>>>>>>>>>>>  > > wrote:> 
>>>>>>>>>>>>  > >> 
>>>>>>>>>>>>  > > > Thanks, John! I made the change. How much longer should I 
>>>>>>>>>>>> let> 
>>>>>>>>> there be> 
>>>>>>>>>>>>  > > > discussion before starting a VOTE?> 
>>>>>>>>>>>>  > > >> 
>>>>>>>>>>>>  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <> 
>>>>>>>>> vvcep...@apache.org> wrote:> 
>>>>>>>>>>>>  > > >> 
>>>>>>>>>>>>  > > > > Thanks, Will,> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > That looks good to me. I would only add "cached" or> 
>>>>>>>> something> 
>>>>>>>>>>>>  > > > > to indicate that it wouldn't just transparently look up 
>>>>>>>>>>>> the> 
>>>>>>>>> current> 
>>>>>>>>>>>>  > > > > System.currentTimeMillis every time.> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > For example:> 
>>>>>>>>>>>>  > > > > /**> 
>>>>>>>>>>>>  > > > > * Returns current cached wall-clock system timestamp in> 
>>>>>>>>> milliseconds.> 
>>>>>>>>>>>>  > > > > *> 
>>>>>>>>>>>>  > > > > * @return the current cached wall-clock system timestamp 
>>>>>>>>>>>> in> 
>>>>>>>>> milliseconds> 
>>>>>>>>>>>>  > > > > */> 
>>>>>>>>>>>>  > > > > long currentSystemTimeMs();> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > I don't want to give specific information about _when_> 
>>>>>>>>> exactly the> 
>>>>>>>>>>>>  > > > > timestamp cache will be updated, so that we can adjust it 
>>>>>>>>>>>> in> 
>>>>>>>>> the> 
>>>>>>>>>>>>  > > > > future, but it does seem important to make people aware 
>>>>>>>>>>>> that> 
>>>>>>>>> they> 
>>>>>>>>>>>>  > > > > won't see the timestamp advance during the execution of> 
>>>>>>>>>>>>  > > > > Processor.process(), for example.> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > With that modification, I'll be +1 on this proposal.> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > Thanks again for the KIP!> 
>>>>>>>>>>>>  > > > > -John> 
>>>>>>>>>>>>  > > > >> 
>>>>>>>>>>>>  > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:> 
>>>>>>>>>>>>  > > > > > Thanks, John! I appreciate you adjusting my lingo. I 
>>>>>>>>>>>> made> 
>>>>>>>>> the change to> 
>>>>>>>>>>>>  > > > > the> 
>>>>>>>>>>>>  > > > > > KIP. I will add the note about system time to the 
>>>>>>>>>>>> javadoc.> 
>>>>>>>>>>>>  > > > > >> 
>>>>>>>>>>>>  > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <> 
>>>>>>>>> vvcep...@apache.org>> 
>>>>>>>>>>>>  > > > > wrote:> 
>>>>>>>>>>>>  > > > > >> 
>>>>>>>>>>>>  > > > > > > Hi Will,> 
>>>>>>>>>>>>  > > > > > >> 
>>>>>>>>>>>>  > > > > > > This proposal looks good to me overall. Thanks for 
>>>>>>>>>>>> the> 
>>>>>>>>> contribution!> 
>>>>>>>>>>>>  > > > > > >> 
>>>>>>>>>>>>  > > > > > > Just a couple of minor notes:> 
>>>>>>>>>>>>  > > > > > >> 
>>>>>>>>>>>>  > > > > > > The system time method would return a cached 
>>>>>>>>>>>> timestamp> 
>>>>>>>>> that Streams> 
>>>>>>>>>>>>  > > > > looks> 
>>>>>>>>>>>>  > > > > > > up once when it starts processing a record. This may 
>>>>>>>>>>>> be> 
>>>>>>>>> confusing, so> 
>>>>>>>>>>>>  > > > > it> 
>>>>>>>>>>>>  > > > > > > might be good to state it in the javadoc.> 
>>>>>>>>>>>>  > > > > > >> 
>>>>>>>>>>>>  > > > > > > I thought the javadoc for the stream time might be a 
>>>>>>>>>>>> bit> 
>>>>>>>>> confusing.> 
>>>>>>>>>>>>  > > > We> 
>>>>>>>>>>>>  > > > > > > normally talk about “Tasks” not “partition groups” 
>>>>>>>>>>>> in> 
>>>>>>>> the> 
>>>>>>>>> public api.> 
>>>>>>>>>>>>  > > > > Maybe> 
>>>>>>>>>>>>  > > > > > > just saying that it’s “the maximum timestamp of any> 
>>>>>>>>> record yet> 
>>>>>>>>>>>>  > > > > processed by> 
>>>>>>>>>>>>  > > > > > > the task” would be both high level an
> [message truncated...]
> 

Reply via email to