> > Does my reply address your concerns?
Yes; also, I definitely misread part of the proposal earlier and thought you had put the timestamp field in RecordMetadata. Sorry for not giving things a closer look before responding! I'm not sure my original message made much sense given the misunderstanding, but thanks for responding anyway :P Having given the proposal a second pass, I agree, it's very elegant. +1 On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org> wrote: > Thanks for the reply, Sophie, > > I think I may have summarized too much in my prior reply. > > In the currently proposed KIP, any caller of forward() must > supply a Record, which consists of: > * key > * value > * timestamp > * headers (with a convenience constructor that sets empty > headers) > > These aren't what I was referring to as potentially being > undefined downstream, since thanks to the introduction of > Record, they are, as you're advocating, required to be > defined everywhere, even when forwarding from a punctuator. > > So to be clear, the intent of this change is actually to > _enforce_ that timestamp would never be undefined (which it > currently can be). Also, since punctuators _are_ going to > have to "make up" a timestamp going forward, we should note > that the "punctuate" method currently passes in a good > timestamp that they can use: for system-time punctuations, > they receive the current system time, and for stream-time > punctuations, they get the current stream time. > > The potentially undefined RecordMetadata only contains these > fields: > * topic > * partition > * offset > > These fields aren't required (or even used) in a Sink, and > it doesn't seem like they would be important to many > applications. Furthermore, it doesn't _seem_ like you'd even > want to set these fields. They seem purely informational and > only useful in the context when you are actually processing > a real input record. It doesn't sound like you were asking > for it, but just to put it on the record, I think if we were > to require values for the metadata from punctuators, people > would mostly just make up their own dummy values, to no > one's benefit. > > I should also note that with the current > Record/RecordMetadata split, we will have the freedom to > move fields into the Record class (or even add new fields) > if we want them to become "data" as opposed to "metadata" in > the future. > > Thanks for your reply; I was similarly floored when I > realized the true nature of the current situation. Does my > reply address your concerns? > > Thanks, > -John > > On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman > wrote: > > > However, the record metadata is only defined when the parent forwards > > > while processing a > > > > real record, not when it calls forward from the punctuator > > > > > > Can we take a step back for a second...why wouldn't you be required to > set > > the RecordContext > > yourself when calling forward from a Punctuator? I think I agree with > Paul > > here, it seems kind of > > absurd not to enforce that the RecordContext be present inside the > > process() method. > > > > The original problem with Punctuators, as I understood it, was that all > of > > the RecordContext > > fields were exposed automatically to both the Processor and any > Punctuator, > > due to being > > direct methods on the ProcessorContext. We can't control which > > ProcessorContext methods > > someone will call from with a Punctuator vs from a Processor. The best we > > could do was > > set these "nonsense" fields to null when inside a Punctuator, or set them > > to some dummy > > values as you pointed out. > > > > But then you proposed the solution of a separate RecordContext which is > not > > attached to the > > ProcessorContext at all. This seemed to solve the above problem very > > neatly: we only pass > > in the RecordContext to the process() method, so we don't have to worry > > about people trying > > to access these fields from within a Punctuator. The fields aren't > > accessible unless they're > > defined. > > > > So what happens when someone wants to forward something from within a > > Punctuator? I > > don't think it's reasonable to let the timestamp field be undefined, > ever. > > What if the Punctuator > > forwards directly to a sink, or directly to some windowing logic. Are we > > supposed to add > > handling for the RecordContext == null case to every processor? Or are we > > just going to > > assume the implicit restriction that users will only forward records > from a > > Punctuator to > > downstream processors that know how to handle and/or set the > RecordContext > > if it's > > undefined. That seems to throw away a lot of the awesome safety added in > > this KIP > > > > Apologies for the rant. But I feel pretty strongly that allowing to > forward > > records from a > > Punctuator without a defined RecordContext would be asking for trouble. > > Imo, if you > > want to forward from a Punctuator, you need to store the info you need in > > order to > > set the timestamp, or make one up yourself > > > > (the one alternative I can think of here is that maybe we could pass in > the > > current > > partition time, so users can at least put in a reasonable estimate for > the > > timestamp > > that won't cause it to get dropped and won't potentially lurch the > > streamtime far into > > the future. This would be similar to what we do in the > TimestampExtractor) > > > > On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org> > wrote: > > > > > Oh, I guess one other thing I should have mentioned is that I’ve > recently > > > discovered that in cases where the context is undefined, we currently > just > > > fill in dummy values for the context. So there’s a good chance that > real > > > applications in use are depending on undefined context without even > > > realizing it. What I’m hoping to do is just make the situation > explicit and > > > get rid of the dummy values. > > > > > > Thanks, > > > John > > > > > > On Tue, Sep 29, 2020, at 20:01, John Roesler wrote: > > > > Thanks for the review, Paul! > > > > > > > > I had read some of that debate before. There seems to be some subtext > > > > there, because they advise against using Optional in cases like this, > > > > but there doesn’t seem to be a specific reason why it’s > inappropriate. > > > > I got the impression they were just afraid that people would go > > > > overboard and make everything Optional. > > > > > > > > I could also make two methods, but it seemed like it might be an > > > > unfortunate way to handle the issue, since Processor is just about a > > > > Function as-is, but the two-method approach would require people to > > > > implement both methods. > > > > > > > > To your question, this is something that’s only recently became clear > > > > to me. Imagine you have a parent processor that calls forward both > from > > > > process and a punctuator. The child will have process() invoked in > both > > > > cases, and won’t be able to distinguish them. However, the record > > > > metadata is only defined when the parent forwards while processing a > > > > real record, not when it calls forward from the punctuator. > > > > > > > > This is why I wanted to make the metadata Optional, to advertise that > > > > the metadata might be undefined if any ancestor processor ever calls > > > > forward from a punctuator. We could remove the Optional and instead > > > > just document that the argument might be null. > > > > > > > > With that context in place, what’s your take? > > > > > > > > Thanks, > > > > John > > > > > > > > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote: > > > > > Looks pretty good to me, though the Processor#process(Record, > > > > > Optional<RecordMetadata>) signature caught my eye. There's some > > > debate > > > > > ( > > > > > > > > > https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments > > > ) > > > > > about whether to use Optionals in arguments, and while that's a > bit of > > > a > > > > > religious debate in the abstract, it did make me wonder whether it > > > makes > > > > > sense in this specific case. When is it actually not present? I > was > > > > > under > > > > > the impression that we should always have access to it in > process(), > > > and > > > > > that the concern about metadata being undefined was about having > > > access > > > > > to > > > > > record metadata in the ProcessorContext held for use inside a > > > > > Punctuator. > > > > > > > > > > If that's not the case and it is truly optional in process(), is > there > > > an > > > > > opportunity for an alternate interface for the cases when we don't > get > > > it, > > > > > rather than force the branching on implementers of the interface? > > > > > > > > > > Apologies if I've missed something, I took a look at the PR and I > > > didn't > > > > > see any spots where I thought it would be empty. Perhaps an > example > > > of a > > > > > Punctuator using (and not using) the new API would clear things up. > > > > > > > > > > Best, > > > > > Paul > > > > > > > > > > On Tue, Sep 29, 2020 at 4:10 PM John Roesler <vvcep...@apache.org> > > > wrote: > > > > > > Hello again, all, > > > > > > > > > > > > Thanks for the latest round of discussion. I've taken the > > > > > > recent feedback and come up with an updated KIP that seems > > > > > > actually quite a bit nicer than the prior proposal. > > > > > > > > > > > > The specific diff on the KIP is here: > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14 > > > > > > These changes are implemented in this POC PR: > > > > > > https://github.com/apache/kafka/pull/9346 > > > > > > > > > > > > The basic idea is that, building on the recent conversaion, > > > > > > we would transition away from the current API where we get > > > > > > only key/value in the process() method and other "data" > > > > > > comes in the ProcessorContext along with the "metadata". > > > > > > > > > > > > Instead, we formalize what is "data" and what is "metadata", > > > > > > and pass it all in to the process method: > > > > > > Processor#process(Record, Optional<RecordMetadata>) > > > > > > > > > > > > Also, you forward the whole data class instead of mutating > > > > > > the ProcessorContext fields and also calling forward: > > > > > > ProcessorContext#forward(Record) > > > > > > > > > > > > The Record class itself ships with methods like > > > > > > record#withValue(NewV newValue) > > > > > > that make a shallow copy of the input Record, enabling > > > > > > Processors to safely handle the record without polluting the > > > > > > context of their parents and siblings. > > > > > > > > > > > > This proposal has a number of key benefits: > > > > > > * As we've discovered in KAFKA-9584, it's unsafe to mutate > > > > > > the Headers via the ProcessorContext. This proposal offers a > > > > > > way to safely forward changes only to downstream processors. > > > > > > * The new API has symmetry (each processor's input is the > > > > > > output of its parent processor) > > > > > > * The API makes clear that the record metadata isn't always > > > > > > defined (for example, in a punctuation, there is no current > > > > > > topic/partition/offset) > > > > > > * The API enables punctuators to forward well defined > > > > > > headers downstream, which is currently not possible. > > > > > > > > > > > > Unless their are objections, I'll go ahead and re-finalize > > > > > > this KIP and update that PR to a mergeable state. > > > > > > > > > > > > Thanks, all, > > > > > > -John > > > > > > > > > > > > > > > > > > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote: > > > > > > > Interesting proposal. However, I am not totally convinced, > because > > > I see > > > > > > > a fundamental difference between "data" and "metadata". > > > > > > > > > > > > > > Topic/partition/offset are "metadata" in the strong sense and > they > > > are > > > > > > > immutable. > > > > > > > > > > > > > > On the other hand there is "primary" data like key and value, > as > > > well as > > > > > > > "secondary" data like timestamp and headers. The issue seems > that > > > we > > > > > > > treat "secondary data" more like metadata atm? > > > > > > > > > > > > > > Thus, promoting timestamp and headers into a first class > citizen > > > roll > > > > > > > make sense to me (my original proposal about `RecordContext` > would > > > still > > > > > > > fall short with this regard). However, putting both (data and > > > metadata) > > > > > > > into a `Record` abstraction might go too far? > > > > > > > > > > > > > > I am also a little bit concerned about `Record.copy()` because > it > > > might > > > > > > > be a trap: Users might assume it does a full deep copy of the > > > record, > > > > > > > however, it would not. It would only create a new `Record` > object > > > as > > > > > > > wrapper that points to the same key/value/header objects as the > > > input > > > > > > > record. > > > > > > > > > > > > > > With the current `context.forward(key, value)` we don't have > this > > > "deep > > > > > > > copy" issue -- it's pretty clear what is happening. > > > > > > > > > > > > > > Instead of `To.all().withTimestamp()` we could also add > > > > > > > `context.forward(key, value, timestamp)` etc (just wondering > about > > > the > > > > > > > exposition in overload)? > > > > > > > > > > > > > > Also, `Record.withValue` etc sounds odd? Should a record not be > > > > > > > immutable? So, we could have something like > > > > > > > > > > > > > > > > > > `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`. > > > > > > > But it looks rather verbose? > > > > > > > > > > > > > > The other question is of course, to what extend to we want to > keep > > > the > > > > > > > distinction between "primary" and "secondary" data? To me, > it's a > > > > > > > question of easy of use? > > > > > > > > > > > > > > Just putting all this out to move the discussion forward. Don't > > > have a > > > > > > > concrete proposal atm. > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > > > > On 9/14/20 9:24 AM, John Roesler wrote: > > > > > > > > Thanks for this thought, Matthias! > > > > > > > > > > > > > > > > To be honest, it's bugged me quite a bit that _all_ the > > > > > > > > record information hasn't been an argument to `process`. I > > > > > > > > suppose I was trying to be conservative in this proposal, > > > > > > > > but then again, if we're adding new Processor and > > > > > > > > ProcessorContext interfaces, then this is the time to make > > > > > > > > such a change. > > > > > > > > > > > > > > > > To be unambiguous, I think this is what we're talking about: > > > > > > > > ProcessorContext: > > > > > > > > * applicationId > > > > > > > > * taskId > > > > > > > > * appConfigs > > > > > > > > * appConfigsWithPrefix > > > > > > > > * keySerde > > > > > > > > * valueSerde > > > > > > > > * stateDir > > > > > > > > * metrics > > > > > > > > * schedule > > > > > > > > * commit > > > > > > > > * forward > > > > > > > > > > > > > > > > StateStoreContext: > > > > > > > > * applicationId > > > > > > > > * taskId > > > > > > > > * appConfigs > > > > > > > > * appConfigsWithPrefix > > > > > > > > * keySerde > > > > > > > > * valueSerde > > > > > > > > * stateDir > > > > > > > > * metrics > > > > > > > > * register > > > > > > > > > > > > > > > > > > > > > > > > RecordContext > > > > > > > > * topic > > > > > > > > * partition > > > > > > > > * offset > > > > > > > > * timestamp > > > > > > > > * headers > > > > > > > > > > > > > > > > > > > > > > > > Your proposal sounds good to me as-is. Just to cover the > > > > > > > > bases, though, I'm wondering if we should push the idea just > > > > > > > > a little farther. Instead of decomposing key,value,context, > > > > > > > > we could just keep them all in one object, like this: > > > > > > > > > > > > > > > > Record: > > > > > > > > * key > > > > > > > > * value > > > > > > > > * topic > > > > > > > > * partition > > > > > > > > * offset > > > > > > > > * timestamp > > > > > > > > * headers > > > > > > > > > > > > > > > > Then, we could have: > > > > > > > > Processor#process(Record) > > > > > > > > ProcessorContext#forward(Record, To) > > > > > > > > > > > > > > > > Viewed from this perspective, a record has three properties > > > > > > > > that people may specify in their processors: key, value, and > > > > > > > > timestamp. > > > > > > > > > > > > > > > > We could deprecate `To#withTimestamp` and enable people to > > > > > > > > specify the timestamp along with the key and value when they > > > > > > > > forward a record. > > > > > > > > > > > > > > > > E.g., > > > > > > > > RecordBuilder toForward = RecordBuilder.copy(record) > > > > > > > > toForward.withKey(newKey) > > > > > > > > toForward.withValue(newValue) > > > > > > > > toForward.withTimestamp(newTimestamp) > > > > > > > > Record newRecord = toForward.build() > > > > > > > > context.forward(newRecord, To.child("child1")) > > > > > > > > > > > > > > > > Or, the more compact common case: > > > > > > > > current: > > > > > > > > context.forward(key, "newValue") > > > > > > > > proposed: > > > > > > > > context.forward(copy(record).withValue("newValue").build()) > > > > > > > > > > > > > > > > > > > > > > > > It's slightly more verbose, but also more extensible. This > > > > > > > > would give us a clean path to add header support in PAPI as > > > > > > > > well, simply by adding `withHeaders` in RecordBuilder. > > > > > > > > > > > > > > > > It's also more symmetrical, since the recipient of `forward` > > > > > > > > would just get the sent `Record`. Whereas today, the sender > > > > > > > > puts the timestamp in `To`, but the recipient gets in in its > > > > > > > > own `ProcessorContext`. > > > > > > > > > > > > > > > > WDYT? > > > > > > > > -John > > > > > > > > > > > > > > > > On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote: > > > > > > > > > I think separating the different contexts make sense. > > > > > > > > > > > > > > > > > > In fact, we could even go one step further and remove the > > > record > > > > > > context > > > > > > > > > from the processor context completely and we add a third > > > parameter to > > > > > > > > > `process(key, value, recordContext)`. This would make it > clear > > > that > > > > > > the > > > > > > > > > context is for the input record only and it's not possible > to > > > pass > > > > > > it to > > > > > > > > > a `punctuate` callback. > > > > > > > > > > > > > > > > > > For the stores and changelogging: I think there are two > cases. > > > (1) > > > > > > You > > > > > > > > > use a plain key-value store. For this case, it seems you do > > > not care > > > > > > > > > about the timestamp and thus does not care what timestamp > is > > > set in > > > > > > the > > > > > > > > > changelog records. (We can set anything we want, as it's > not > > > > > > relevant at > > > > > > > > > all -- the timestamp is ignored on read anyway.) (2) The > other > > > case > > > > > > is, > > > > > > > > > that one does care about timestamps, and for this case > should > > > use > > > > > > > > > TimestampedKeyValueStore. The passed timestamp will be set > on > > > the > > > > > > > > > changelog records for this case. > > > > > > > > > > > > > > > > > > Thus, for both cases, accessing the record context does not > > > seems to > > > > > > be > > > > > > > > > a requirement. And providing access to the processor > context > > > to, eg., > > > > > > > > > `forward()` or similar seems safe. > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > On 9/10/20 7:25 PM, John Roesler wrote: > > > > > > > > > > Thanks for the reply, Paul! > > > > > > > > > > > > > > > > > > > > I certainly intend to make sure that the changelogging > layer > > > > > > > > > > continues to work the way it does now, by hook or by > crook. > > > > > > > > > > I think the easiest path for me is to just "cheat" and > get > > > > > > > > > > the real ProcessorContext into the ChangeLoggingStore > > > > > > > > > > implementation somehow. I'll tag you on the PR when I > create > > > > > > > > > > it, so you have an opportunity to express a preference > about > > > > > > > > > > the implementation choice, and maybe even compile/test > > > > > > > > > > against it to make sure your stuff still works. > > > > > > > > > > > > > > > > > > > > Regarding this: > > > > > > > > > > > > > > > > > > > > > we have an interest in making a state store with a > richer > > > > > > > > > > > way of querying its data (like perhaps getting all > values > > > > > > > > > > > associated with a secondary key), while still > ultimately > > > > > > > > > > > writing to the changelog topic for later restoration. > > > > > > > > > > > > > > > > > > > > This is very intriguing to me. On the side, I've been > > > > > > > > > > preparing a couple of ideas related to this topic. I > don't > > > > > > > > > > think I have a coherent enough thought to even express > it in > > > > > > > > > > a Jira right now, but when I do, I'll tag you on it also > to > > > > > > > > > > see what you think. > > > > > > > > > > > > > > > > > > > > Whenever you're ready to share the usability improvement > > > > > > > > > > ideas, I'm very interested to see what you've come up > with. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote: > > > > > > > > > > > > when you use a HashMap or RocksDB or other "state > > > stores", you > > > > > > don't > > > > > > > > > > > > expect them to automatically know extra stuff about > the > > > record > > > > > > you're > > > > > > > > > > > > storing. > > > > > > > > > > > > > > > > > > > > > > So, I don't think there is any reason we *can't* > retain the > > > > > > record context > > > > > > > > > > > > in the StateStoreContext, and if any users came along > > > with a > > > > > > clear use case > > > > > > > > > > > > I'd find that convincing. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree with the principle of being conservative with > the > > > > > > StateStoreContext > > > > > > > > > > > API. Regarding user expectations or a clear use case, > the > > > only > > > > > > > > > > > counterpoint I would offer is that we sort of have that > > > use case > > > > > > already, > > > > > > > > > > > which is the example I gave of the change logging store > > > using the > > > > > > > > > > > timestamp. I am curious if this functionality will be > > > retained > > > > > > when using > > > > > > > > > > > built in state stores, or will a low-level processor > get a > > > > > > KeyValueStore > > > > > > > > > > > that no longer writes to the changelog topic with the > > > record's > > > > > > timestamp. > > > > > > > > > > > While I personally don't care much about that > functionality > > > > > > specifically, I > > > > > > > > > > > have a general desire for custom state stores to > easily do > > > the > > > > > > things that > > > > > > > > > > > built in state stores do. > > > > > > > > > > > > > > > > > > > > > > It genuinely did not occur to me that users might be > > > looking up > > > > > > and/or > > > > > > > > > > > > updating records of other keys from within a > Processor. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm glad you said this Sophie, because it gives me an > > > > > > opportunity to say > > > > > > > > > > > that this is actually a *huge* use case for my team. > The > > > state > > > > > > store > > > > > > > > > > > usability improvements I was referring to in my > previous > > > message > > > > > > were about > > > > > > > > > > > enabling the user to write custom stores while still > easily > > > > > > hooking into > > > > > > > > > > > the ability to write to a changelog topic. I think > that is > > > > > > technically > > > > > > > > > > > possible now, but I don't think it's trivial. > > > Specifically, we > > > > > > have an > > > > > > > > > > > interest in making a state store with a richer way of > > > querying > > > > > > its data > > > > > > > > > > > (like perhaps getting all values associated with a > > > secondary > > > > > > key), while > > > > > > > > > > > still ultimately writing to the changelog topic for > later > > > > > > restoration. > > > > > > > > > > > We recognize that this use case throws away some of > what > > > kafka > > > > > > streams > > > > > > > > > > > (especially the DSL) is good at - easy > parallelizability by > > > > > > partitioning > > > > > > > > > > > all processing by key - and that our business logic > would > > > > > > completely fall > > > > > > > > > > > apart if we were consuming from multi-partition topics > with > > > > > > multiple > > > > > > > > > > > consumers. But we have found that using the low level > > > processor > > > > > > API is > > > > > > > > > > > good for the very simple stream processing primitives > it > > > > > > provides: handling > > > > > > > > > > > the plumbing of consuming from multiple kafka topics > and > > > > > > potentially > > > > > > > > > > > updating persistent local state in a reliable way. > That in > > > > > > itself has > > > > > > > > > > > proven to be a worthwhile programming model. > > > > > > > > > > > > > > > > > > > > > > Since I got off track a bit, let me summarize: I don't > > > > > > particularly care > > > > > > > > > > > about the record context being available to state store > > > > > > implementations, > > > > > > > > > > > and I think this KIP is headed in the right direction > in > > > that > > > > > > regard. But > > > > > > > > > > > more generally, I wanted to express the importance of > > > > > > maintaining a > > > > > > > > > > > powerful and flexible StateStore interface. > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > Paul > > > > > > > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman < > > > > > > sop...@confluent.io> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Aha, I did misinterpret the example in your previous > > > response > > > > > > regarding the > > > > > > > > > > > > range query after all. I thought you just meant a > > > time-range > > > > > > query inside a > > > > > > > > > > > > punctuator. It genuinely did not occur to me that > users > > > might > > > > > > be looking up > > > > > > > > > > > > and/or updating records of other keys from within a > > > Processor. > > > > > > Sorry for > > > > > > > > > > > > being closed minded > > > > > > > > > > > > > > > > > > > > > > > > I won't drag out this discussion any further by > asking > > > whether > > > > > > that might > > > > > > > > > > > > be > > > > > > > > > > > > a valid use case or just a lurking bug in itself :) > > > > > > > > > > > > > > > > > > > > > > > > Thanks for humoring me. The current proposal for > KIP-478 > > > > > > sounds good to me > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler < > > > > > > vvcep...@apache.org> wrote: > > > > > > > > > > > > > Ah, thanks Sophie, > > > > > > > > > > > > > > > > > > > > > > > > > > I'm sorry for misinterpreting your resonse. Yes, we > > > > > > > > > > > > > absolutely can and should clear the context before > > > > > > > > > > > > > punctuating. > > > > > > > > > > > > > > > > > > > > > > > > > > My secondary concern is maybe more far-fetched. I > was > > > > > > > > > > > > > thinking that inside process(key,value), a > Processor > > > might > > > > > > > > > > > > > do a get/put of a _different_ key. Consider, for > > > example, > > > > > > > > > > > > > the way that Suppress processors work. When they > get a > > > > > > > > > > > > > record, they add it to the store and then do a > range > > > scan > > > > > > > > > > > > > and possibly forward a _different_ record. Of > course, > > > this > > > > > > > > > > > > > is an operation that is deeply coupled to the > > > internals, and > > > > > > > > > > > > > the Suppress processor accordingly actually does > get > > > access > > > > > > > > > > > > > to the internal context so that it can set the > context > > > > > > > > > > > > > before forwarding. > > > > > > > > > > > > > > > > > > > > > > > > > > Still, it seems like I've had a handful of > > > conversations > > > > > > > > > > > > > with people over the years in which they tell me > they > > > are > > > > > > > > > > > > > using state stores in a way that transcends the > "get > > > and put > > > > > > > > > > > > > the currently processing record" access pattern. I > > > doubt > > > > > > > > > > > > > that those folks would even have considered the > > > possiblity > > > > > > > > > > > > > that the currently processing record's _context_ > could > > > > > > > > > > > > > pollute their state store operations, as I myself > > > never gave > > > > > > > > > > > > > it a second thought until the current conversation > > > began. In > > > > > > > > > > > > > cases like that, we have actually set a trap for > these > > > > > > > > > > > > > people, and it seems better to dismantle the trap. > > > > > > > > > > > > > > > > > > > > > > > > > > As you noted, really the only people who would be > > > negatively > > > > > > > > > > > > > impacted are people who implement their own state > > > stores. > > > > > > > > > > > > > These folks will get the deprecation warning and > try to > > > > > > > > > > > > > adapt their stores to the new interface. If they > needed > > > > > > > > > > > > > access to the record context, they would find it's > now > > > > > > > > > > > > > missing. They'd ask us about it, and we'd have the > > > ability > > > > > > > > > > > > > to explain the lurking bug that they have had in > their > > > > > > > > > > > > > stores all along, as well as the new recommended > > > pattern > > > > > > > > > > > > > (just pass everything you need in the value). If > that's > > > > > > > > > > > > > unsatisfying, _then_ we should consider amending > the > > > API. > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie > Blee-Goldman > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Regarding your first sentence, "...the > processor > > > would > > > > > > null > > > > > > > > > > > > > > > out the record context...", this is not > possible, > > > since > > > > > > the > > > > > > > > > > > > > > > processor doesn't have write access to the > > > context. We > > > > > > could > > > > > > > > > > > > > > > add it, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Sorry, this was poorly phrased, I definitely did > not > > > mean > > > > > > to imply that > > > > > > > > > > > > > we > > > > > > > > > > > > > > should make the context modifiable by the > Processors > > > > > > themselves. I > > > > > > > > > > > > meant > > > > > > > > > > > > > > this should be handled by the internal processing > > > > > > framework that deals > > > > > > > > > > > > > with > > > > > > > > > > > > > > passing records from one Processor to the next, > > > setting > > > > > > the record > > > > > > > > > > > > > context > > > > > > > > > > > > > > when a new record is picked up, invoking the > > > punctuators, > > > > > > etc. I > > > > > > > > > > > > believe > > > > > > > > > > > > > > this > > > > > > > > > > > > > > all currently happens in the StreamTask? It > already > > > can > > > > > > and does > > > > > > > > > > > > > overwrite > > > > > > > > > > > > > > the record context as new records are processed, > and > > > is > > > > > > also > > > > > > > > > > > > responsible > > > > > > > > > > > > > > for calling the punctuators, so it doesn't seem > like > > > a > > > > > > huge leap to > > > > > > > > > > > > just > > > > > > > > > > > > > say > > > > > > > > > > > > > > "null out the current record before punctuating" > > > > > > > > > > > > > > > > > > > > > > > > > > > > To clarify, I was never advocating or even > > > considering to > > > > > > give the > > > > > > > > > > > > > > Processors > > > > > > > > > > > > > > write access to the record context. Sorry if my > last > > > > > > message (or all of > > > > > > > > > > > > > > them) > > > > > > > > > > > > > > was misleading. I just wanted to point out that > the > > > > > > punctuator concern > > > > > > > > > > > > is > > > > > > > > > > > > > > orthogonal to the question of whether we should > > > include > > > > > > the record > > > > > > > > > > > > > context > > > > > > > > > > > > > > in the StateStoreContext. It's definitely a real > > > problem, > > > > > > but it's a > > > > > > > > > > > > > > problem > > > > > > > > > > > > > > that exists at the Processor level and not just > the > > > > > > StateStore. > > > > > > > > > > > > > > So, I don't think there is any reason we *can't* > > > retain > > > > > > the record > > > > > > > > > > > > > context > > > > > > > > > > > > > > in the > > > > > > > > > > > > > > StateStoreContext, and if any users came along > with a > > > > > > clear use case > > > > > > > > > > > > I'd > > > > > > > > > > > > > > find > > > > > > > > > > > > > > that convincing. In the absence of any examples, > the > > > > > > conservative > > > > > > > > > > > > > approach > > > > > > > > > > > > > > sounds good to me. > > > > > > > > > > > > > > > > > > > > > > > > > > > > If it turns out that someone did need the record > > > context > > > > > > in their > > > > > > > > > > > > custom > > > > > > > > > > > > > > state > > > > > > > > > > > > > > store, I'm sure they'll submit a politely worded > bug > > > > > > report alerting us > > > > > > > > > > > > > > that we > > > > > > > > > > > > > > broke their application. > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler < > > > > > > vvcep...@apache.org> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Thanks, Sophie, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, now that you point it out, I can see that > the > > > record > > > > > > > > > > > > > > > context itself should be nulled out by Streams > > > before > > > > > > > > > > > > > > > invoking punctuators. From that perspective, we > > > don't > > > > > > need > > > > > > > > > > > > > > > to think about the second-order problem of > what's > > > in the > > > > > > > > > > > > > > > context for the state store when called from a > > > > > > punctuator. > > > > > > > > > > > > > > > Regarding your first sentence, "...the > processor > > > would > > > > > > null > > > > > > > > > > > > > > > out the record context...", this is not > possible, > > > since > > > > > > the > > > > > > > > > > > > > > > processor doesn't have write access to the > > > context. We > > > > > > could > > > > > > > > > > > > > > > add it, but then all kinds of strange effects > > > would ensue > > > > > > > > > > > > > > > when downstream processors execute but the > context > > > is > > > > > > empty, > > > > > > > > > > > > > > > etc. Better to just let the framework manage > the > > > record > > > > > > > > > > > > > > > context and keep it read-only for Processors. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Reading between the lines of your last reply, > it > > > sounds > > > > > > that > > > > > > > > > > > > > > > the disconnect may just have been a mutual > > > > > > misunderstanding > > > > > > > > > > > > > > > about whether or not Processors currently have > > > access to > > > > > > set > > > > > > > > > > > > > > > the record context. Since they do not, if we > > > wanted to > > > > > > add > > > > > > > > > > > > > > > the record context to StateStoreContext in a > > > well-defined > > > > > > > > > > > > > > > way, we'd also have to add the ability for > > > Processors to > > > > > > > > > > > > > > > manipulate it. But then, we're just creating a > > > > > > side-channel > > > > > > > > > > > > > > > for Processors to pass some information in > > > arguments to > > > > > > > > > > > > > > > "put()" and other information implicitly > through > > > the > > > > > > > > > > > > > > > context. It seems better just to go for a > single > > > channel > > > > > > for > > > > > > > > > > > > > > > now. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It sounds like you're basically in favor of the > > > > > > conservative > > > > > > > > > > > > > > > approach, and you just wanted to understand the > > > blockers > > > > > > > > > > > > > > > that I implied. Does my clarification make > sense? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie > > > Blee-Goldman > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > I was just thinking that the processor would > > > null out > > > > > > the record > > > > > > > > > > > > > context > > > > > > > > > > > > > > > > after it > > > > > > > > > > > > > > > > finished processing the record, so I'm not > sure I > > > > > > follow why this > > > > > > > > > > > > > would > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > possible? AFAIK we never call a punctuator > in the > > > > > > middle of > > > > > > > > > > > > > processing a > > > > > > > > > > > > > > > > record through the topology, and even if we > did, > > > we > > > > > > still know when > > > > > > > > > > > > > it is > > > > > > > > > > > > > > > > about > > > > > > > > > > > > > > > > to be called and could set it to null > beforehand. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not trying to advocate for it here, I'm > in > > > > > > agreement that > > > > > > > > > > > > > anything > > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > want > > > > > > > > > > > > > > > > to access within the store can and should be > > > accessed > > > > > > within the > > > > > > > > > > > > > calling > > > > > > > > > > > > > > > > Processor/Punctuator before reaching the > store. > > > The > > > > > > "we can always > > > > > > > > > > > > > add it > > > > > > > > > > > > > > > > later if necessary" argument is also pretty > > > > > > convincing. Just trying > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > understand > > > > > > > > > > > > > > > > why this wouldn't be possible. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > FWIW, the question of "what is the current > > > record in > > > > > > the context > > > > > > > > > > > > of a > > > > > > > > > > > > > > > > Punctuator" > > > > > > > > > > > > > > > > exists independently of whether we want to > add > > > this to > > > > > > the > > > > > > > > > > > > > > > StateStoreContext > > > > > > > > > > > > > > > > or not. The full ProcessorContext, including > the > > > > > > current record > > > > > > > > > > > > > context, > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > already available within a Punctuator, so > > > removing the > > > > > > current > > > > > > > > > > > > record > > > > > > > > > > > > > > > > context > > > > > > > > > > > > > > > > from the StateStoreContext does not solve the > > > problem. > > > > > > Users can -- > > > > > > > > > > > > > and > > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > (see KAFKA-9584 < > > > > > > https://issues.apache.org/jira/browse/KAFKA-9584 > > > > > > > > > > > > > ;;) > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > hit > > > > > > > > > > > > > > > > such subtle bugs without ever invoking a > > > StateStore > > > > > > > > > > > > > > > > from their punctuator. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Again, I think I do agree that we should > leave > > > the > > > > > > current record > > > > > > > > > > > > > context > > > > > > > > > > > > > > > > off of > > > > > > > > > > > > > > > > the StateStoreContext, but I don't think the > > > > > > Punctuator argument > > > > > > > > > > > > > against > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > very convincing. It sounds to me like we > need to > > > > > > disallow access to > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > current > > > > > > > > > > > > > > > > record context from within the Punctuator, > > > independent > > > > > > of anything > > > > > > > > > > > > > to do > > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > state stores > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John Roesler > < > > > > > > vvcep...@apache.org> > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Thanks for the thoughts, Sophie. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree that the extra information could be > > > useful. > > > > > > My only > > > > > > > > > > > > > concern is > > > > > > > > > > > > > > > > > that it doesn’t seem like we can actually > > > supply > > > > > > that extra > > > > > > > > > > > > > information > > > > > > > > > > > > > > > > > correctly. So, then we have a situation > where > > > the > > > > > > system offers > > > > > > > > > > > > > useful > > > > > > > > > > > > > > > API > > > > > > > > > > > > > > > > > calls that are only correct in a narrow > range > > > of use > > > > > > cases. > > > > > > > > > > > > > Outside of > > > > > > > > > > > > > > > > > those use cases, you get incorrect > behavior. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If it were possible to null out the context > > > before > > > > > > you put a > > > > > > > > > > > > > document > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > which the context doesn’t apply, then the > > > concern > > > > > > would be > > > > > > > > > > > > > mitigated. > > > > > > > > > > > > > > > But > > > > > > > > > > > > > > > > > it would still be pretty weird from the > > > perspective > > > > > > of the store > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > sometimes the context is populated and > other > > > times, > > > > > > it’s null. > > > > > > > > > > > > > > > > > But that seems moot, since it doesn’t seem > > > possible > > > > > > to null out > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > context. Only the Processor could know > whether > > > it’s > > > > > > about to put > > > > > > > > > > > > a > > > > > > > > > > > > > > > document > > > > > > > > > > > > > > > > > different from the context or not. And it > > > would be > > > > > > inappropriate > > > > > > > > > > > > to > > > > > > > > > > > > > > > offer a > > > > > > > > > > > > > > > > > public ProcessorContext api to manage the > > > record > > > > > > context. > > > > > > > > > > > > > > > > > Ultimately, it still seems like if you > want to > > > store > > > > > > headers, you > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > > store them explicitly, right? That doesn’t > seem > > > > > > onerous to me, > > > > > > > > > > > > and > > > > > > > > > > > > > it > > > > > > > > > > > > > > > kind > > > > > > > > > > > > > > > > > of seems better than relying on undefined > or > > > > > > asymmetrical > > > > > > > > > > > > behavior > > > > > > > > > > > > > in > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > store itself. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Anyway, I’m not saying that we couldn’t > solve > > > these > > > > > > problems. > > > > > > > > > > > > Just > > > > > > > > > > > > > > > that it > > > > > > > > > > > > > > > > > seems a little that we can be conservative > and > > > avoid > > > > > > them for > > > > > > > > > > > > now. > > > > > > > > > > > > > If > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > > turns out we really need to solve them, we > can > > > > > > always do it > > > > > > > > > > > > later. > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie > > > Blee-Goldman > > > > > > wrote: > > > > > > > > > > > > > > > > > > > If you were to call "put" from a > > > punctuator, or > > > > > > do a > > > > > > > > > > > > > > > > > > > `range()` query and then update one of > > > those > > > > > > records with > > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle bug > on > > > your > > > > > > hands. > > > > > > > > > > > > > > > > > > Can you elaborate on this a bit? I agree > > > that the > > > > > > punctuator > > > > > > > > > > > > > case is > > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > > > obvious exemption to the assumption that > > > store > > > > > > invocations > > > > > > > > > > > > always > > > > > > > > > > > > > > > > > > have a corresponding "current record", > but I > > > don't > > > > > > understand > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > second example. Are you envisioning a > > > scenario > > > > > > where the > > > > > > > > > > > > #process > > > > > > > > > > > > > > > > > > method performs a range query and then > > > updates > > > > > > records? Or were > > > > > > > > > > > > > > > > > > you just giving another example of the > > > punctuator > > > > > > case? > > > > > > > > > > > > > > > > > > I only bring it up because I agree that > the > > > > > > current record > > > > > > > > > > > > > > > information > > > > > > > > > > > > > > > > > could > > > > > > > > > > > > > > > > > > still be useful within the context of the > > > store. > > > > > > As a non-user > > > > > > > > > > > > my > > > > > > > > > > > > > > > input > > > > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > definitely has limited value, but it just > > > isn't > > > > > > striking me as > > > > > > > > > > > > > > > obvious > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > > should remove access to the current > record > > > context > > > > > > from the > > > > > > > > > > > > state > > > > > > > > > > > > > > > stores. > > > > > > > > > > > > > > > > > > If there is no current record, as in the > > > > > > punctuator case, we > > > > > > > > > > > > > should > > > > > > > > > > > > > > > just > > > > > > > > > > > > > > > > > > set > > > > > > > > > > > > > > > > > > the record context to null (or > > > Optional.empty, > > > > > > etc). > > > > > > > > > > > > > > > > > > That said, the put() always has to come > from > > > > > > somewhere, and > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > somewhere is always going to be either a > > > Processor > > > > > > or a > > > > > > > > > > > > > Punctuator, > > > > > > > > > > > > > > > both > > > > > > > > > > > > > > > > > > of which will still have access to the > full > > > > > > context. So > > > > > > > > > > > > > additional > > > > > > > > > > > > > > > info > > > > > > > > > > > > > > > > > > such as > > > > > > > > > > > > > > > > > > the timestamp can and should probably be > > > supplied > > > > > > to the store > > > > > > > > > > > > > before > > > > > > > > > > > > > > > > > > calling put(), rather than looked up by > the > > > store. > > > > > > But I can > > > > > > > > > > > > see > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > other > > > > > > > > > > > > > > > > > > things being useful, for example the > current > > > > > > record's headers. > > > > > > > > > > > > > Maybe > > > > > > > > > > > > > > > > > if/when > > > > > > > > > > > > > > > > > > we add better (or any) support for > headers in > > > > > > state stores this > > > > > > > > > > > > > will > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > less true. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Of course as John has made clear, it's > > > pretty hard > > > > > > to judge > > > > > > > > > > > > > without > > > > > > > > > > > > > > > > > > examples > > > > > > > > > > > > > > > > > > and more insight as to what actually > goes on > > > > > > within a custom > > > > > > > > > > > > > state > > > > > > > > > > > > > > > store > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John > Roesler < > > > > > > > > > > > > vvcep...@apache.org > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Paul, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It's good to hear from you! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm glad you're in favor of the > direction. > > > > > > Especially when > > > > > > > > > > > > > > > > > > > it comes to public API and usability > > > concens, I > > > > > > tend to > > > > > > > > > > > > > > > > > > > think that "the folks who matter" are > > > actually > > > > > > the folks who > > > > > > > > > > > > > > > > > > > have to use the APIs to accomplish real > > > tasks. > > > > > > It can be > > > > > > > > > > > > > > > > > > > hard for me to be sure I'm thinking > > > clearly from > > > > > > that > > > > > > > > > > > > > > > > > > > perspective. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Funny story, I also started down this > road > > > a > > > > > > couple of times > > > > > > > > > > > > > > > > > > > already and backed them out before the > KIP > > > > > > because I was > > > > > > > > > > > > > > > > > > > afraid of the scope of the proposal. > > > > > > Unfortunately, needing > > > > > > > > > > > > > > > > > > > to make a new ProcessorContext kind of > > > forced my > > > > > > hand. > > > > > > > > > > > > > > > > > > > I see you've called me out about the > > > > > > ChangeLogging stores :) > > > > > > > > > > > > > > > > > > > In fact, I think these are the > main/only > > > reason > > > > > > that stores > > > > > > > > > > > > > > > > > > > might really need to invoke > "forward()". My > > > > > > secret plan was > > > > > > > > > > > > > > > > > > > to cheat and either accomplish > > > change-logging by > > > > > > a different > > > > > > > > > > > > > > > > > > > mechanism than implementing the store > > > interface, > > > > > > or by just > > > > > > > > > > > > > > > > > > > breaking encapsulation to sneak the > "real" > > > > > > ProcessorContext > > > > > > > > > > > > > > > > > > > into the ChangeLogging stores. But > those > > > are all > > > > > > > > > > > > > > > > > > > implementation details. I think the key > > > question > > > > > > is whether > > > > > > > > > > > > > > > > > > > anyone else has a store implementation > that > > > > > > needs to call > > > > > > > > > > > > > > > > > > > "forward()". It's not what you > mentioned, > > > but > > > > > > since you > > > > > > > > > > > > > > > > > > > spoke up, I'll just ask: if you have a > use > > > case > > > > > > for calling > > > > > > > > > > > > > > > > > > > "forward()" in a store, please share > it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regarding the other record-specific > context > > > > > > methods, I think > > > > > > > > > > > > > > > > > > > you have a good point, but I also can't > > > quite > > > > > > wrap my head > > > > > > > > > > > > > > > > > > > around how we can actually guarantee > it to > > > work > > > > > > in general. > > > > > > > > > > > > > > > > > > > For example, the case you cited, where > the > > > > > > implementation of > > > > > > > > > > > > > > > > > > > `KeyValueStore#put(key, value)` uses > the > > > context > > > > > > to augment > > > > > > > > > > > > > > > > > > > the record with timestamp information. > This > > > > > > relies on the > > > > > > > > > > > > > > > > > > > assumption that you would only call > > > "put()" from > > > > > > inside a > > > > > > > > > > > > > > > > > > > `Processor#process(key, value)` call in > > > which > > > > > > the record > > > > > > > > > > > > > > > > > > > being processed is the same record that > > > you're > > > > > > trying to put > > > > > > > > > > > > > > > > > > > into the store. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If you were to call "put" from a > > > punctuator, or > > > > > > do a > > > > > > > > > > > > > > > > > > > `range()` query and then update one of > > > those > > > > > > records with > > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle bug > on > > > your > > > > > > hands. Right > > > > > > > > > > > > > > > > > > > now, the Streams component that > actually > > > calls > > > > > > the Processor > > > > > > > > > > > > > > > > > > > takes care to set the right record > context > > > > > > before invoking > > > > > > > > > > > > > > > > > > > the method, and in the case of caching, > > > etc., it > > > > > > also takes > > > > > > > > > > > > > > > > > > > care to swap out the old context and > keep > > > it > > > > > > somewhere safe. > > > > > > > > > > > > > > > > > > > But when it comes to public API > Processors > > > > > > calling methods > > > > > > > > > > > > > > > > > > > on StateStores, there's no opportunity > for > > > any > > > > > > component to > > > > > > > > > > > > > > > > > > > make sure the context is always > correct. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In the face of that situation, it > seemed > > > better > > > > > > to just move > > > > > > > > > > > > > > > > > > > in the direction of a "normal" data > store. > > > I.e., > > > > > > when you > > > > > > > > > > > > > > > > > > > use a HashMap or RocksDB or other > "state > > > > > > stores", you don't > > > > > > > > > > > > > > > > > > > expect them to automatically know extra > > > stuff > > > > > > about the > > > > > > > > > > > > > > > > > > > record you're storing. If you need > them to > > > know > > > > > > something, > > > > > > > > > > > > > > > > > > > you just put it in the value. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > All of that said, I'm just reasoning > from > > > first > > > > > > principles > > > > > > > > > > > > > > > > > > > here. To really know if this is a > mistake > > > or > > > > > > not, I need to > > > > > > > > > > > > > > > > > > > be in your place. So please push back > if > > > you > > > > > > think what I > > > > > > > > > > > > > > > > > > > said is nonsense. My personal plan was > to > > > keep > > > > > > an eye out > > > > > > > > > > > > > > > > > > > during the period where the old API was > > > still > > > > > > present, but > > > > > > > > > > > > > > > > > > > deprecated, to see if people were > > > struggling to > > > > > > use the new > > > > > > > > > > > > > > > > > > > API. If so, then we'd have a chance to > > > address > > > > > > it before > > > > > > > > > > > > > > > > > > > dropping the old API. But it's even > better > > > if > > > > > > you can help > > > > > > > > > > > > > > > > > > > think it through now. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It did also cross my mind to _not_ add > the > > > > > > > > > > > > > > > > > > > StateStoreContext, but just to > continue to > > > punt > > > > > > on the > > > > > > > > > > > > > > > > > > > question by just dropping in the new > > > > > > ProcessorContext to the > > > > > > > > > > > > > > > > > > > new init method. If StateStoreContext > > > seems too > > > > > > bold, we can > > > > > > > > > > > > > > > > > > > go that direction. But if we actually > add > > > some > > > > > > methods to > > > > > > > > > > > > > > > > > > > StateStoreContext, I'd like to be able > to > > > ensure > > > > > > they would > > > > > > > > > > > > > > > > > > > be well defined. I think the current > > > situation > > > > > > was more of > > > > > > > > > > > > > > > > > > > an oversight than a choice. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks again for your reply, > > > > > > > > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul > > > Whalen > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > John, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It's exciting to see this KIP head in > > > this > > > > > > direction! In > > > > > > > > > > > > the > > > > > > > > > > > > > > > last > > > > > > > > > > > > > > > > > year > > > > > > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > > > > so I've tried to sketch out some > > > usability > > > > > > improvements for > > > > > > > > > > > > > > > custom > > > > > > > > > > > > > > > > > state > > > > > > > > > > > > > > > > > > > > stores, and I also ended up splitting > > > out the > > > > > > > > > > > > > StateStoreContext > > > > > > > > > > > > > > > from > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > ProcessorContext in an attempt to > > > facilitate > > > > > > what I was > > > > > > > > > > > > > doing. I > > > > > > > > > > > > > > > > > sort of > > > > > > > > > > > > > > > > > > > > abandoned it when I realized how > large > > > the > > > > > > ideal change > > > > > > > > > > > > might > > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > > to be, > > > > > > > > > > > > > > > > > > > > but it's great to see that there is > other > > > > > > interest in > > > > > > > > > > > > moving > > > > > > > > > > > > > in > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > direction (from the folks that > matter :) > > > ). > > > > > > > > > > > > > > > > > > > > Having taken a stab at it myself, I > have > > > a > > > > > > comment/question > > > > > > > > > > > > > on > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > bullet > > > > > > > > > > > > > > > > > > > > about StateStoreContext: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It does *not* include anything > > > processor- or > > > > > > record- > > > > > > > > > > > > > specific, > > > > > > > > > > > > > > > like > > > > > > > > > > > > > > > > > > > > > `forward()` or any information > about > > > the > > > > > > "current" > > > > > > > > > > > > record, > > > > > > > > > > > > > > > which is > > > > > > > > > > > > > > > > > > > only a > > > > > > > > > > > > > > > > > > > > > well-defined in the context of the > > > > > > Processor. Processors > > > > > > > > > > > > > > > process > > > > > > > > > > > > > > > > > one > > > > > > > > > > > > > > > > > > > record > > > > > > > > > > > > > > > > > > > > > at a time, but state stores may be > > > used to > > > > > > store and > > > > > > > > > > > > fetch > > > > > > > > > > > > > many > > > > > > > > > > > > > > > > > > > records, so > > > > > > > > > > > > > > > > > > > > > there is no "current record". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I totally agree that record-specific > or > > > > > > processor-specific > > > > > > > > > > > > > > > context > > > > > > > > > > > > > > > > > in a > > > > > > > > > > > > > > > > > > > > state store is often not well-defined > > > and it > > > > > > would be good > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > separate > > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > out, but sometimes it (at least > > > > > > record-specific context) is > > > > > > > > > > > > > > > actually > > > > > > > > > > > > > > > > > > > > useful, for example, passing the > record's > > > > > > timestamp through > > > > > > > > > > > > > to > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > underlying storage (or changelog > topic): > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121 > > > > > > > > > > > > > > > > > > > > You could have the writer client of > the > > > state > > > > > > store pass > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > through, > > > > > > > > > > > > > > > > > > > but > > > > > > > > > > > > > > > > > > > > it would be nice to be able to write > > > state > > > > > > stores where the > > > > > > > > > > > > > > > client > > > > > > > > > > > > > > > > > did > > > > > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > > > > > have this responsibility. I'm not > sure > > > if the > > > > > > solution is > > > > > > > > > > > > > to add > > > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > > > > things back to StateStoreContext, or > > > make yet > > > > > > another > > > > > > > > > > > > context > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > represents record-specific context > while > > > > > > inside a state > > > > > > > > > > > > > store. > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > Paul > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John > > > Roesler < > > > > > > > > > > > > > j...@vvcephei.org> > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I've been slowly pushing KIP-478 > > > forward > > > > > > over the last > > > > > > > > > > > > > year, > > > > > > > > > > > > > > > > > > > > > and I'm happy to say that we're > making > > > good > > > > > > progress now. > > > > > > > > > > > > > > > > > > > > > However, several issues with the > > > original > > > > > > design have > > > > > > > > > > > > come > > > > > > > > > > > > > > > > > > > > > to light. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The major changes: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We discovered that the original > plan > > > of just > > > > > > adding > > > > > > > > > > > > generic > > > > > > > > > > > > > > > > > > > > > parameters to ProcessorContext was > too > > > > > > disruptive, so we > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > > > now adding a new > api.ProcessorContext. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > That choice forces us to add a new > > > > > > StateStore.init method > > > > > > > > > > > > > > > > > > > > > for the new context, but > > > ProcessorContext > > > > > > really isn't > > > > > > > > > > > > > ideal > > > > > > > > > > > > > > > > > > > > > for state stores to begin with, so > I'm > > > > > > proposing a new > > > > > > > > > > > > > > > > > > > > > StateStoreContext for this > purpose. In > > > a > > > > > > nutshell, there > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > > > quite a few methods in > > > ProcessorContext that > > > > > > actually > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > > > > > > never be called from inside a > > > StateStore. > > > > > > > > > > > > > > > > > > > > > Also, since there is a new > > > ProcessorContext > > > > > > interface, we > > > > > > > > > > > > > > > > > > > > > need a new MockProcessorContext > > > > > > implementation in the > > > > > > > > > > > > test- > > > > > > > > > > > > > > > > > > > > > utils module. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The changeset for the KIP document > is > > > here: > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10 > > > > > > > > > > > > > > > > > > > > > And the KIP itself is here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API > > > > > > > > > > > > > > > > > > > > > If you have any concerns, please > let > > > me know! > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >