Aha, I did misinterpret the example in your previous response regarding the range query after all. I thought you just meant a time-range query inside a punctuator. It genuinely did not occur to me that users might be looking up and/or updating records of other keys from within a Processor. Sorry for being closed minded
I won't drag out this discussion any further by asking whether that might be a valid use case or just a lurking bug in itself :) Thanks for humoring me. The current proposal for KIP-478 sounds good to me On Thu, Sep 10, 2020 at 3:43 PM John Roesler <vvcep...@apache.org> wrote: > Ah, thanks Sophie, > > I'm sorry for misinterpreting your resonse. Yes, we > absolutely can and should clear the context before > punctuating. > > My secondary concern is maybe more far-fetched. I was > thinking that inside process(key,value), a Processor might > do a get/put of a _different_ key. Consider, for example, > the way that Suppress processors work. When they get a > record, they add it to the store and then do a range scan > and possibly forward a _different_ record. Of course, this > is an operation that is deeply coupled to the internals, and > the Suppress processor accordingly actually does get access > to the internal context so that it can set the context > before forwarding. > > Still, it seems like I've had a handful of conversations > with people over the years in which they tell me they are > using state stores in a way that transcends the "get and put > the currently processing record" access pattern. I doubt > that those folks would even have considered the possiblity > that the currently processing record's _context_ could > pollute their state store operations, as I myself never gave > it a second thought until the current conversation began. In > cases like that, we have actually set a trap for these > people, and it seems better to dismantle the trap. > > As you noted, really the only people who would be negatively > impacted are people who implement their own state stores. > These folks will get the deprecation warning and try to > adapt their stores to the new interface. If they needed > access to the record context, they would find it's now > missing. They'd ask us about it, and we'd have the ability > to explain the lurking bug that they have had in their > stores all along, as well as the new recommended pattern > (just pass everything you need in the value). If that's > unsatisfying, _then_ we should consider amending the API. > > Thanks, > -John > > On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman > wrote: > > > Regarding your first sentence, "...the processor would null > > > out the record context...", this is not possible, since the > > > processor doesn't have write access to the context. We could > > > add it, > > > > > > > Sorry, this was poorly phrased, I definitely did not mean to imply that > we > > should make the context modifiable by the Processors themselves. I meant > > this should be handled by the internal processing framework that deals > with > > passing records from one Processor to the next, setting the record > context > > when a new record is picked up, invoking the punctuators, etc. I believe > > this > > all currently happens in the StreamTask? It already can and does > overwrite > > the record context as new records are processed, and is also responsible > > for calling the punctuators, so it doesn't seem like a huge leap to just > say > > "null out the current record before punctuating" > > > > To clarify, I was never advocating or even considering to give the > > Processors > > write access to the record context. Sorry if my last message (or all of > > them) > > was misleading. I just wanted to point out that the punctuator concern is > > orthogonal to the question of whether we should include the record > context > > in the StateStoreContext. It's definitely a real problem, but it's a > > problem > > that exists at the Processor level and not just the StateStore. > > > > So, I don't think there is any reason we *can't* retain the record > context > > in the > > StateStoreContext, and if any users came along with a clear use case I'd > > find > > that convincing. In the absence of any examples, the conservative > approach > > sounds good to me. > > > > If it turns out that someone did need the record context in their custom > > state > > store, I'm sure they'll submit a politely worded bug report alerting us > > that we > > broke their application. > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler <vvcep...@apache.org> > wrote: > > > > > Thanks, Sophie, > > > > > > Yes, now that you point it out, I can see that the record > > > context itself should be nulled out by Streams before > > > invoking punctuators. From that perspective, we don't need > > > to think about the second-order problem of what's in the > > > context for the state store when called from a punctuator. > > > > > > Regarding your first sentence, "...the processor would null > > > out the record context...", this is not possible, since the > > > processor doesn't have write access to the context. We could > > > add it, but then all kinds of strange effects would ensue > > > when downstream processors execute but the context is empty, > > > etc. Better to just let the framework manage the record > > > context and keep it read-only for Processors. > > > > > > Reading between the lines of your last reply, it sounds that > > > the disconnect may just have been a mutual misunderstanding > > > about whether or not Processors currently have access to set > > > the record context. Since they do not, if we wanted to add > > > the record context to StateStoreContext in a well-defined > > > way, we'd also have to add the ability for Processors to > > > manipulate it. But then, we're just creating a side-channel > > > for Processors to pass some information in arguments to > > > "put()" and other information implicitly through the > > > context. It seems better just to go for a single channel for > > > now. > > > > > > It sounds like you're basically in favor of the conservative > > > approach, and you just wanted to understand the blockers > > > that I implied. Does my clarification make sense? > > > > > > Thanks, > > > -John > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie Blee-Goldman > > > wrote: > > > > I was just thinking that the processor would null out the record > context > > > > after it > > > > finished processing the record, so I'm not sure I follow why this > would > > > not > > > > be > > > > possible? AFAIK we never call a punctuator in the middle of > processing a > > > > record through the topology, and even if we did, we still know when > it is > > > > about > > > > to be called and could set it to null beforehand. > > > > > > > > I'm not trying to advocate for it here, I'm in agreement that > anything > > > you > > > > want > > > > to access within the store can and should be accessed within the > calling > > > > Processor/Punctuator before reaching the store. The "we can always > add it > > > > later if necessary" argument is also pretty convincing. Just trying > to > > > > understand > > > > why this wouldn't be possible. > > > > > > > > FWIW, the question of "what is the current record in the context of a > > > > Punctuator" > > > > exists independently of whether we want to add this to the > > > StateStoreContext > > > > or not. The full ProcessorContext, including the current record > context, > > > is > > > > already available within a Punctuator, so removing the current record > > > > context > > > > from the StateStoreContext does not solve the problem. Users can -- > and > > > have > > > > (see KAFKA-9584 <https://issues.apache.org/jira/browse/KAFKA-9584>;;) > -- > > > hit > > > > such subtle bugs without ever invoking a StateStore > > > > from their punctuator. > > > > > > > > Again, I think I do agree that we should leave the current record > context > > > > off of > > > > the StateStoreContext, but I don't think the Punctuator argument > against > > > it > > > > is > > > > very convincing. It sounds to me like we need to disallow access to > the > > > > current > > > > record context from within the Punctuator, independent of anything > to do > > > > with > > > > state stores > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John Roesler <vvcep...@apache.org> > > > wrote: > > > > > Thanks for the thoughts, Sophie. > > > > > > > > > > I agree that the extra information could be useful. My only > concern is > > > > > that it doesn’t seem like we can actually supply that extra > information > > > > > correctly. So, then we have a situation where the system offers > useful > > > API > > > > > calls that are only correct in a narrow range of use cases. > Outside of > > > > > those use cases, you get incorrect behavior. > > > > > > > > > > If it were possible to null out the context before you put a > document > > > to > > > > > which the context doesn’t apply, then the concern would be > mitigated. > > > But > > > > > it would still be pretty weird from the perspective of the store > that > > > > > sometimes the context is populated and other times, it’s null. > > > > > > > > > > But that seems moot, since it doesn’t seem possible to null out the > > > > > context. Only the Processor could know whether it’s about to put a > > > document > > > > > different from the context or not. And it would be inappropriate to > > > offer a > > > > > public ProcessorContext api to manage the record context. > > > > > > > > > > Ultimately, it still seems like if you want to store headers, you > can > > > > > store them explicitly, right? That doesn’t seem onerous to me, and > it > > > kind > > > > > of seems better than relying on undefined or asymmetrical behavior > in > > > the > > > > > store itself. > > > > > > > > > > Anyway, I’m not saying that we couldn’t solve these problems. Just > > > that it > > > > > seems a little that we can be conservative and avoid them for now. > If > > > it > > > > > turns out we really need to solve them, we can always do it later. > > > > > > > > > > Thanks, > > > > > John > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote: > > > > > > > If you were to call "put" from a punctuator, or do a > > > > > > > `range()` query and then update one of those records with > > > > > > > `put()`, you'd have a very subtle bug on your hands. > > > > > > > > > > > > Can you elaborate on this a bit? I agree that the punctuator > case is > > > an > > > > > > obvious exemption to the assumption that store invocations always > > > > > > have a corresponding "current record", but I don't understand the > > > > > > second example. Are you envisioning a scenario where the #process > > > > > > method performs a range query and then updates records? Or were > > > > > > you just giving another example of the punctuator case? > > > > > > > > > > > > I only bring it up because I agree that the current record > > > information > > > > > could > > > > > > still be useful within the context of the store. As a non-user my > > > input > > > > > on > > > > > > this > > > > > > definitely has limited value, but it just isn't striking me as > > > obvious > > > > > that > > > > > > we > > > > > > should remove access to the current record context from the state > > > stores. > > > > > > If there is no current record, as in the punctuator case, we > should > > > just > > > > > > set > > > > > > the record context to null (or Optional.empty, etc). > > > > > > > > > > > > That said, the put() always has to come from somewhere, and that > > > > > > somewhere is always going to be either a Processor or a > Punctuator, > > > both > > > > > > of which will still have access to the full context. So > additional > > > info > > > > > > such as > > > > > > the timestamp can and should probably be supplied to the store > before > > > > > > calling put(), rather than looked up by the store. But I can see > some > > > > > other > > > > > > things being useful, for example the current record's headers. > Maybe > > > > > if/when > > > > > > we add better (or any) support for headers in state stores this > will > > > be > > > > > > less true. > > > > > > > > > > > > Of course as John has made clear, it's pretty hard to judge > without > > > > > > examples > > > > > > and more insight as to what actually goes on within a custom > state > > > store > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John Roesler <vvcep...@apache.org > > > > > wrote: > > > > > > > Hi Paul, > > > > > > > > > > > > > > It's good to hear from you! > > > > > > > > > > > > > > I'm glad you're in favor of the direction. Especially when > > > > > > > it comes to public API and usability concens, I tend to > > > > > > > think that "the folks who matter" are actually the folks who > > > > > > > have to use the APIs to accomplish real tasks. It can be > > > > > > > hard for me to be sure I'm thinking clearly from that > > > > > > > perspective. > > > > > > > > > > > > > > Funny story, I also started down this road a couple of times > > > > > > > already and backed them out before the KIP because I was > > > > > > > afraid of the scope of the proposal. Unfortunately, needing > > > > > > > to make a new ProcessorContext kind of forced my hand. > > > > > > > > > > > > > > I see you've called me out about the ChangeLogging stores :) > > > > > > > In fact, I think these are the main/only reason that stores > > > > > > > might really need to invoke "forward()". My secret plan was > > > > > > > to cheat and either accomplish change-logging by a different > > > > > > > mechanism than implementing the store interface, or by just > > > > > > > breaking encapsulation to sneak the "real" ProcessorContext > > > > > > > into the ChangeLogging stores. But those are all > > > > > > > implementation details. I think the key question is whether > > > > > > > anyone else has a store implementation that needs to call > > > > > > > "forward()". It's not what you mentioned, but since you > > > > > > > spoke up, I'll just ask: if you have a use case for calling > > > > > > > "forward()" in a store, please share it. > > > > > > > > > > > > > > Regarding the other record-specific context methods, I think > > > > > > > you have a good point, but I also can't quite wrap my head > > > > > > > around how we can actually guarantee it to work in general. > > > > > > > For example, the case you cited, where the implementation of > > > > > > > `KeyValueStore#put(key, value)` uses the context to augment > > > > > > > the record with timestamp information. This relies on the > > > > > > > assumption that you would only call "put()" from inside a > > > > > > > `Processor#process(key, value)` call in which the record > > > > > > > being processed is the same record that you're trying to put > > > > > > > into the store. > > > > > > > > > > > > > > If you were to call "put" from a punctuator, or do a > > > > > > > `range()` query and then update one of those records with > > > > > > > `put()`, you'd have a very subtle bug on your hands. Right > > > > > > > now, the Streams component that actually calls the Processor > > > > > > > takes care to set the right record context before invoking > > > > > > > the method, and in the case of caching, etc., it also takes > > > > > > > care to swap out the old context and keep it somewhere safe. > > > > > > > But when it comes to public API Processors calling methods > > > > > > > on StateStores, there's no opportunity for any component to > > > > > > > make sure the context is always correct. > > > > > > > > > > > > > > In the face of that situation, it seemed better to just move > > > > > > > in the direction of a "normal" data store. I.e., when you > > > > > > > use a HashMap or RocksDB or other "state stores", you don't > > > > > > > expect them to automatically know extra stuff about the > > > > > > > record you're storing. If you need them to know something, > > > > > > > you just put it in the value. > > > > > > > > > > > > > > All of that said, I'm just reasoning from first principles > > > > > > > here. To really know if this is a mistake or not, I need to > > > > > > > be in your place. So please push back if you think what I > > > > > > > said is nonsense. My personal plan was to keep an eye out > > > > > > > during the period where the old API was still present, but > > > > > > > deprecated, to see if people were struggling to use the new > > > > > > > API. If so, then we'd have a chance to address it before > > > > > > > dropping the old API. But it's even better if you can help > > > > > > > think it through now. > > > > > > > > > > > > > > It did also cross my mind to _not_ add the > > > > > > > StateStoreContext, but just to continue to punt on the > > > > > > > question by just dropping in the new ProcessorContext to the > > > > > > > new init method. If StateStoreContext seems too bold, we can > > > > > > > go that direction. But if we actually add some methods to > > > > > > > StateStoreContext, I'd like to be able to ensure they would > > > > > > > be well defined. I think the current situation was more of > > > > > > > an oversight than a choice. > > > > > > > > > > > > > > Thanks again for your reply, > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul Whalen wrote: > > > > > > > > John, > > > > > > > > > > > > > > > > It's exciting to see this KIP head in this direction! In the > > > last > > > > > year > > > > > > > or > > > > > > > > so I've tried to sketch out some usability improvements for > > > custom > > > > > state > > > > > > > > stores, and I also ended up splitting out the > StateStoreContext > > > from > > > > > the > > > > > > > > ProcessorContext in an attempt to facilitate what I was > doing. I > > > > > sort of > > > > > > > > abandoned it when I realized how large the ideal change might > > > have > > > > > to be, > > > > > > > > but it's great to see that there is other interest in moving > in > > > this > > > > > > > > direction (from the folks that matter :) ). > > > > > > > > > > > > > > > > Having taken a stab at it myself, I have a comment/question > on > > > this > > > > > > > bullet > > > > > > > > about StateStoreContext: > > > > > > > > > > > > > > > > It does *not* include anything processor- or record- > specific, > > > like > > > > > > > > > `forward()` or any information about the "current" record, > > > which is > > > > > > > only a > > > > > > > > > well-defined in the context of the Processor. Processors > > > process > > > > > one > > > > > > > record > > > > > > > > > at a time, but state stores may be used to store and fetch > many > > > > > > > records, so > > > > > > > > > there is no "current record". > > > > > > > > > > > > > > > > > > > > > > > > > I totally agree that record-specific or processor-specific > > > context > > > > > in a > > > > > > > > state store is often not well-defined and it would be good to > > > > > separate > > > > > > > that > > > > > > > > out, but sometimes it (at least record-specific context) is > > > actually > > > > > > > > useful, for example, passing the record's timestamp through > to > > > the > > > > > > > > underlying storage (or changelog topic): > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121 > > > > > > > > You could have the writer client of the state store pass this > > > > > through, > > > > > > > but > > > > > > > > it would be nice to be able to write state stores where the > > > client > > > > > did > > > > > > > not > > > > > > > > have this responsibility. I'm not sure if the solution is > to add > > > > > some > > > > > > > > things back to StateStoreContext, or make yet another context > > > that > > > > > > > > represents record-specific context while inside a state > store. > > > > > > > > > > > > > > > > Best, > > > > > > > > Paul > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John Roesler < > j...@vvcephei.org> > > > > > wrote: > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > I've been slowly pushing KIP-478 forward over the last > year, > > > > > > > > > and I'm happy to say that we're making good progress now. > > > > > > > > > However, several issues with the original design have come > > > > > > > > > to light. > > > > > > > > > > > > > > > > > > The major changes: > > > > > > > > > > > > > > > > > > We discovered that the original plan of just adding generic > > > > > > > > > parameters to ProcessorContext was too disruptive, so we > are > > > > > > > > > now adding a new api.ProcessorContext. > > > > > > > > > > > > > > > > > > That choice forces us to add a new StateStore.init method > > > > > > > > > for the new context, but ProcessorContext really isn't > ideal > > > > > > > > > for state stores to begin with, so I'm proposing a new > > > > > > > > > StateStoreContext for this purpose. In a nutshell, there > are > > > > > > > > > quite a few methods in ProcessorContext that actually > should > > > > > > > > > never be called from inside a StateStore. > > > > > > > > > > > > > > > > > > Also, since there is a new ProcessorContext interface, we > > > > > > > > > need a new MockProcessorContext implementation in the test- > > > > > > > > > utils module. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The changeset for the KIP document is here: > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10 > > > > > > > > > And the KIP itself is here: > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API > > > > > > > > > If you have any concerns, please let me know! > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > >