Thanks for this thought, Matthias! To be honest, it's bugged me quite a bit that _all_ the record information hasn't been an argument to `process`. I suppose I was trying to be conservative in this proposal, but then again, if we're adding new Processor and ProcessorContext interfaces, then this is the time to make such a change.
To be unambiguous, I think this is what we're talking about: ProcessorContext: * applicationId * taskId * appConfigs * appConfigsWithPrefix * keySerde * valueSerde * stateDir * metrics * schedule * commit * forward StateStoreContext: * applicationId * taskId * appConfigs * appConfigsWithPrefix * keySerde * valueSerde * stateDir * metrics * register RecordContext * topic * partition * offset * timestamp * headers Your proposal sounds good to me as-is. Just to cover the bases, though, I'm wondering if we should push the idea just a little farther. Instead of decomposing key,value,context, we could just keep them all in one object, like this: Record: * key * value * topic * partition * offset * timestamp * headers Then, we could have: Processor#process(Record) ProcessorContext#forward(Record, To) Viewed from this perspective, a record has three properties that people may specify in their processors: key, value, and timestamp. We could deprecate `To#withTimestamp` and enable people to specify the timestamp along with the key and value when they forward a record. E.g., RecordBuilder toForward = RecordBuilder.copy(record) toForward.withKey(newKey) toForward.withValue(newValue) toForward.withTimestamp(newTimestamp) Record newRecord = toForward.build() context.forward(newRecord, To.child("child1")) Or, the more compact common case: current: context.forward(key, "newValue") proposed: context.forward(copy(record).withValue("newValue").build()) It's slightly more verbose, but also more extensible. This would give us a clean path to add header support in PAPI as well, simply by adding `withHeaders` in RecordBuilder. It's also more symmetrical, since the recipient of `forward` would just get the sent `Record`. Whereas today, the sender puts the timestamp in `To`, but the recipient gets in in its own `ProcessorContext`. WDYT? -John On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote: > I think separating the different contexts make sense. > > In fact, we could even go one step further and remove the record context > from the processor context completely and we add a third parameter to > `process(key, value, recordContext)`. This would make it clear that the > context is for the input record only and it's not possible to pass it to > a `punctuate` callback. > > For the stores and changelogging: I think there are two cases. (1) You > use a plain key-value store. For this case, it seems you do not care > about the timestamp and thus does not care what timestamp is set in the > changelog records. (We can set anything we want, as it's not relevant at > all -- the timestamp is ignored on read anyway.) (2) The other case is, > that one does care about timestamps, and for this case should use > TimestampedKeyValueStore. The passed timestamp will be set on the > changelog records for this case. > > Thus, for both cases, accessing the record context does not seems to be > a requirement. And providing access to the processor context to, eg., > `forward()` or similar seems safe. > > > -Matthias > > On 9/10/20 7:25 PM, John Roesler wrote: > > Thanks for the reply, Paul! > > > > I certainly intend to make sure that the changelogging layer > > continues to work the way it does now, by hook or by crook. > > I think the easiest path for me is to just "cheat" and get > > the real ProcessorContext into the ChangeLoggingStore > > implementation somehow. I'll tag you on the PR when I create > > it, so you have an opportunity to express a preference about > > the implementation choice, and maybe even compile/test > > against it to make sure your stuff still works. > > > > Regarding this: > > > > > we have an interest in making a state store with a richer > > > way of querying its data (like perhaps getting all values > > > associated with a secondary key), while still ultimately > > > writing to the changelog topic for later restoration. > > > > This is very intriguing to me. On the side, I've been > > preparing a couple of ideas related to this topic. I don't > > think I have a coherent enough thought to even express it in > > a Jira right now, but when I do, I'll tag you on it also to > > see what you think. > > > > Whenever you're ready to share the usability improvement > > ideas, I'm very interested to see what you've come up with. > > > > Thanks, > > -John > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote: > > > > when you use a HashMap or RocksDB or other "state stores", you don't > > > > expect them to automatically know extra stuff about the record you're > > > > storing. > > > > > > So, I don't think there is any reason we *can't* retain the record context > > > > in the StateStoreContext, and if any users came along with a clear use > > > > case > > > > I'd find that convincing. > > > > > > > > > > I agree with the principle of being conservative with the > > > StateStoreContext > > > API. Regarding user expectations or a clear use case, the only > > > counterpoint I would offer is that we sort of have that use case already, > > > which is the example I gave of the change logging store using the > > > timestamp. I am curious if this functionality will be retained when using > > > built in state stores, or will a low-level processor get a KeyValueStore > > > that no longer writes to the changelog topic with the record's timestamp. > > > While I personally don't care much about that functionality specifically, > > > I > > > have a general desire for custom state stores to easily do the things that > > > built in state stores do. > > > > > > It genuinely did not occur to me that users might be looking up and/or > > > > updating records of other keys from within a Processor. > > > > > > > > > > I'm glad you said this Sophie, because it gives me an opportunity to say > > > that this is actually a *huge* use case for my team. The state store > > > usability improvements I was referring to in my previous message were > > > about > > > enabling the user to write custom stores while still easily hooking into > > > the ability to write to a changelog topic. I think that is technically > > > possible now, but I don't think it's trivial. Specifically, we have an > > > interest in making a state store with a richer way of querying its data > > > (like perhaps getting all values associated with a secondary key), while > > > still ultimately writing to the changelog topic for later restoration. > > > > > > We recognize that this use case throws away some of what kafka streams > > > (especially the DSL) is good at - easy parallelizability by partitioning > > > all processing by key - and that our business logic would completely fall > > > apart if we were consuming from multi-partition topics with multiple > > > consumers. But we have found that using the low level processor API is > > > good for the very simple stream processing primitives it provides: > > > handling > > > the plumbing of consuming from multiple kafka topics and potentially > > > updating persistent local state in a reliable way. That in itself has > > > proven to be a worthwhile programming model. > > > > > > Since I got off track a bit, let me summarize: I don't particularly care > > > about the record context being available to state store implementations, > > > and I think this KIP is headed in the right direction in that regard. But > > > more generally, I wanted to express the importance of maintaining a > > > powerful and flexible StateStore interface. > > > > > > Thanks! > > > Paul > > > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman <sop...@confluent.io> > > > wrote: > > > > > > > Aha, I did misinterpret the example in your previous response regarding > > > > the > > > > range query after all. I thought you just meant a time-range query > > > > inside a > > > > punctuator. It genuinely did not occur to me that users might be > > > > looking up > > > > and/or updating records of other keys from within a Processor. Sorry for > > > > being closed minded > > > > > > > > I won't drag out this discussion any further by asking whether that > > > > might > > > > be > > > > a valid use case or just a lurking bug in itself :) > > > > > > > > Thanks for humoring me. The current proposal for KIP-478 sounds good to > > > > me > > > > > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler <vvcep...@apache.org> > > > > wrote: > > > > > > > > > Ah, thanks Sophie, > > > > > > > > > > I'm sorry for misinterpreting your resonse. Yes, we > > > > > absolutely can and should clear the context before > > > > > punctuating. > > > > > > > > > > My secondary concern is maybe more far-fetched. I was > > > > > thinking that inside process(key,value), a Processor might > > > > > do a get/put of a _different_ key. Consider, for example, > > > > > the way that Suppress processors work. When they get a > > > > > record, they add it to the store and then do a range scan > > > > > and possibly forward a _different_ record. Of course, this > > > > > is an operation that is deeply coupled to the internals, and > > > > > the Suppress processor accordingly actually does get access > > > > > to the internal context so that it can set the context > > > > > before forwarding. > > > > > > > > > > Still, it seems like I've had a handful of conversations > > > > > with people over the years in which they tell me they are > > > > > using state stores in a way that transcends the "get and put > > > > > the currently processing record" access pattern. I doubt > > > > > that those folks would even have considered the possiblity > > > > > that the currently processing record's _context_ could > > > > > pollute their state store operations, as I myself never gave > > > > > it a second thought until the current conversation began. In > > > > > cases like that, we have actually set a trap for these > > > > > people, and it seems better to dismantle the trap. > > > > > > > > > > As you noted, really the only people who would be negatively > > > > > impacted are people who implement their own state stores. > > > > > These folks will get the deprecation warning and try to > > > > > adapt their stores to the new interface. If they needed > > > > > access to the record context, they would find it's now > > > > > missing. They'd ask us about it, and we'd have the ability > > > > > to explain the lurking bug that they have had in their > > > > > stores all along, as well as the new recommended pattern > > > > > (just pass everything you need in the value). If that's > > > > > unsatisfying, _then_ we should consider amending the API. > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman > > > > > wrote: > > > > > > > Regarding your first sentence, "...the processor would null > > > > > > > out the record context...", this is not possible, since the > > > > > > > processor doesn't have write access to the context. We could > > > > > > > add it, > > > > > > > > > > > > > > > > > > > Sorry, this was poorly phrased, I definitely did not mean to imply > > > > > > that > > > > > we > > > > > > should make the context modifiable by the Processors themselves. I > > > > meant > > > > > > this should be handled by the internal processing framework that > > > > > > deals > > > > > with > > > > > > passing records from one Processor to the next, setting the record > > > > > context > > > > > > when a new record is picked up, invoking the punctuators, etc. I > > > > believe > > > > > > this > > > > > > all currently happens in the StreamTask? It already can and does > > > > > overwrite > > > > > > the record context as new records are processed, and is also > > > > responsible > > > > > > for calling the punctuators, so it doesn't seem like a huge leap to > > > > just > > > > > say > > > > > > "null out the current record before punctuating" > > > > > > > > > > > > To clarify, I was never advocating or even considering to give the > > > > > > Processors > > > > > > write access to the record context. Sorry if my last message (or > > > > > > all of > > > > > > them) > > > > > > was misleading. I just wanted to point out that the punctuator > > > > > > concern > > > > is > > > > > > orthogonal to the question of whether we should include the record > > > > > context > > > > > > in the StateStoreContext. It's definitely a real problem, but it's a > > > > > > problem > > > > > > that exists at the Processor level and not just the StateStore. > > > > > > > > > > > > So, I don't think there is any reason we *can't* retain the record > > > > > context > > > > > > in the > > > > > > StateStoreContext, and if any users came along with a clear use case > > > > I'd > > > > > > find > > > > > > that convincing. In the absence of any examples, the conservative > > > > > approach > > > > > > sounds good to me. > > > > > > > > > > > > If it turns out that someone did need the record context in their > > > > custom > > > > > > state > > > > > > store, I'm sure they'll submit a politely worded bug report > > > > > > alerting us > > > > > > that we > > > > > > broke their application. > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler <vvcep...@apache.org> > > > > > wrote: > > > > > > > Thanks, Sophie, > > > > > > > > > > > > > > Yes, now that you point it out, I can see that the record > > > > > > > context itself should be nulled out by Streams before > > > > > > > invoking punctuators. From that perspective, we don't need > > > > > > > to think about the second-order problem of what's in the > > > > > > > context for the state store when called from a punctuator. > > > > > > > > > > > > > > Regarding your first sentence, "...the processor would null > > > > > > > out the record context...", this is not possible, since the > > > > > > > processor doesn't have write access to the context. We could > > > > > > > add it, but then all kinds of strange effects would ensue > > > > > > > when downstream processors execute but the context is empty, > > > > > > > etc. Better to just let the framework manage the record > > > > > > > context and keep it read-only for Processors. > > > > > > > > > > > > > > Reading between the lines of your last reply, it sounds that > > > > > > > the disconnect may just have been a mutual misunderstanding > > > > > > > about whether or not Processors currently have access to set > > > > > > > the record context. Since they do not, if we wanted to add > > > > > > > the record context to StateStoreContext in a well-defined > > > > > > > way, we'd also have to add the ability for Processors to > > > > > > > manipulate it. But then, we're just creating a side-channel > > > > > > > for Processors to pass some information in arguments to > > > > > > > "put()" and other information implicitly through the > > > > > > > context. It seems better just to go for a single channel for > > > > > > > now. > > > > > > > > > > > > > > It sounds like you're basically in favor of the conservative > > > > > > > approach, and you just wanted to understand the blockers > > > > > > > that I implied. Does my clarification make sense? > > > > > > > > > > > > > > Thanks, > > > > > > > -John > > > > > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie Blee-Goldman > > > > > > > wrote: > > > > > > > > I was just thinking that the processor would null out the record > > > > > context > > > > > > > > after it > > > > > > > > finished processing the record, so I'm not sure I follow why > > > > > > > > this > > > > > would > > > > > > > not > > > > > > > > be > > > > > > > > possible? AFAIK we never call a punctuator in the middle of > > > > > processing a > > > > > > > > record through the topology, and even if we did, we still know > > > > > > > > when > > > > > it is > > > > > > > > about > > > > > > > > to be called and could set it to null beforehand. > > > > > > > > > > > > > > > > I'm not trying to advocate for it here, I'm in agreement that > > > > > anything > > > > > > > you > > > > > > > > want > > > > > > > > to access within the store can and should be accessed within the > > > > > calling > > > > > > > > Processor/Punctuator before reaching the store. The "we can > > > > > > > > always > > > > > add it > > > > > > > > later if necessary" argument is also pretty convincing. Just > > > > > > > > trying > > > > > to > > > > > > > > understand > > > > > > > > why this wouldn't be possible. > > > > > > > > > > > > > > > > FWIW, the question of "what is the current record in the context > > > > of a > > > > > > > > Punctuator" > > > > > > > > exists independently of whether we want to add this to the > > > > > > > StateStoreContext > > > > > > > > or not. The full ProcessorContext, including the current record > > > > > context, > > > > > > > is > > > > > > > > already available within a Punctuator, so removing the current > > > > record > > > > > > > > context > > > > > > > > from the StateStoreContext does not solve the problem. Users > > > > > > > > can -- > > > > > and > > > > > > > have > > > > > > > > (see KAFKA-9584 > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-9584 > > > > > ;;) > > > > > -- > > > > > > > hit > > > > > > > > such subtle bugs without ever invoking a StateStore > > > > > > > > from their punctuator. > > > > > > > > > > > > > > > > Again, I think I do agree that we should leave the current > > > > > > > > record > > > > > context > > > > > > > > off of > > > > > > > > the StateStoreContext, but I don't think the Punctuator argument > > > > > against > > > > > > > it > > > > > > > > is > > > > > > > > very convincing. It sounds to me like we need to disallow > > > > > > > > access to > > > > > the > > > > > > > > current > > > > > > > > record context from within the Punctuator, independent of > > > > > > > > anything > > > > > to do > > > > > > > > with > > > > > > > > state stores > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John Roesler > > > > > > > > <vvcep...@apache.org> > > > > > > > wrote: > > > > > > > > > Thanks for the thoughts, Sophie. > > > > > > > > > > > > > > > > > > I agree that the extra information could be useful. My only > > > > > concern is > > > > > > > > > that it doesn’t seem like we can actually supply that extra > > > > > information > > > > > > > > > correctly. So, then we have a situation where the system > > > > > > > > > offers > > > > > useful > > > > > > > API > > > > > > > > > calls that are only correct in a narrow range of use cases. > > > > > Outside of > > > > > > > > > those use cases, you get incorrect behavior. > > > > > > > > > > > > > > > > > > If it were possible to null out the context before you put a > > > > > document > > > > > > > to > > > > > > > > > which the context doesn’t apply, then the concern would be > > > > > mitigated. > > > > > > > But > > > > > > > > > it would still be pretty weird from the perspective of the > > > > > > > > > store > > > > > that > > > > > > > > > sometimes the context is populated and other times, it’s null. > > > > > > > > > > > > > > > > > > But that seems moot, since it doesn’t seem possible to null > > > > > > > > > out > > > > the > > > > > > > > > context. Only the Processor could know whether it’s about to > > > > > > > > > put > > > > a > > > > > > > document > > > > > > > > > different from the context or not. And it would be > > > > > > > > > inappropriate > > > > to > > > > > > > offer a > > > > > > > > > public ProcessorContext api to manage the record context. > > > > > > > > > > > > > > > > > > Ultimately, it still seems like if you want to store headers, > > > > > > > > > you > > > > > can > > > > > > > > > store them explicitly, right? That doesn’t seem onerous to me, > > > > and > > > > > it > > > > > > > kind > > > > > > > > > of seems better than relying on undefined or asymmetrical > > > > behavior > > > > > in > > > > > > > the > > > > > > > > > store itself. > > > > > > > > > > > > > > > > > > Anyway, I’m not saying that we couldn’t solve these problems. > > > > Just > > > > > > > that it > > > > > > > > > seems a little that we can be conservative and avoid them for > > > > now. > > > > > If > > > > > > > it > > > > > > > > > turns out we really need to solve them, we can always do it > > > > later. > > > > > > > > > Thanks, > > > > > > > > > John > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote: > > > > > > > > > > > If you were to call "put" from a punctuator, or do a > > > > > > > > > > > `range()` query and then update one of those records with > > > > > > > > > > > `put()`, you'd have a very subtle bug on your hands. > > > > > > > > > > > > > > > > > > > > Can you elaborate on this a bit? I agree that the punctuator > > > > > case is > > > > > > > an > > > > > > > > > > obvious exemption to the assumption that store invocations > > > > always > > > > > > > > > > have a corresponding "current record", but I don't > > > > > > > > > > understand > > > > the > > > > > > > > > > second example. Are you envisioning a scenario where the > > > > #process > > > > > > > > > > method performs a range query and then updates records? Or > > > > > > > > > > were > > > > > > > > > > you just giving another example of the punctuator case? > > > > > > > > > > > > > > > > > > > > I only bring it up because I agree that the current record > > > > > > > information > > > > > > > > > could > > > > > > > > > > still be useful within the context of the store. As a > > > > > > > > > > non-user > > > > my > > > > > > > input > > > > > > > > > on > > > > > > > > > > this > > > > > > > > > > definitely has limited value, but it just isn't striking me > > > > > > > > > > as > > > > > > > obvious > > > > > > > > > that > > > > > > > > > > we > > > > > > > > > > should remove access to the current record context from the > > > > state > > > > > > > stores. > > > > > > > > > > If there is no current record, as in the punctuator case, > > > > > > > > > > we > > > > > should > > > > > > > just > > > > > > > > > > set > > > > > > > > > > the record context to null (or Optional.empty, etc). > > > > > > > > > > > > > > > > > > > > That said, the put() always has to come from somewhere, and > > > > that > > > > > > > > > > somewhere is always going to be either a Processor or a > > > > > Punctuator, > > > > > > > both > > > > > > > > > > of which will still have access to the full context. So > > > > > additional > > > > > > > info > > > > > > > > > > such as > > > > > > > > > > the timestamp can and should probably be supplied to the > > > > > > > > > > store > > > > > before > > > > > > > > > > calling put(), rather than looked up by the store. But I can > > > > see > > > > > some > > > > > > > > > other > > > > > > > > > > things being useful, for example the current record's > > > > > > > > > > headers. > > > > > Maybe > > > > > > > > > if/when > > > > > > > > > > we add better (or any) support for headers in state stores > > > > > > > > > > this > > > > > will > > > > > > > be > > > > > > > > > > less true. > > > > > > > > > > > > > > > > > > > > Of course as John has made clear, it's pretty hard to judge > > > > > without > > > > > > > > > > examples > > > > > > > > > > and more insight as to what actually goes on within a custom > > > > > state > > > > > > > store > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John Roesler < > > > > vvcep...@apache.org > > > > > > > wrote: > > > > > > > > > > > Hi Paul, > > > > > > > > > > > > > > > > > > > > > > It's good to hear from you! > > > > > > > > > > > > > > > > > > > > > > I'm glad you're in favor of the direction. Especially when > > > > > > > > > > > it comes to public API and usability concens, I tend to > > > > > > > > > > > think that "the folks who matter" are actually the folks > > > > > > > > > > > who > > > > > > > > > > > have to use the APIs to accomplish real tasks. It can be > > > > > > > > > > > hard for me to be sure I'm thinking clearly from that > > > > > > > > > > > perspective. > > > > > > > > > > > > > > > > > > > > > > Funny story, I also started down this road a couple of > > > > > > > > > > > times > > > > > > > > > > > already and backed them out before the KIP because I was > > > > > > > > > > > afraid of the scope of the proposal. Unfortunately, > > > > > > > > > > > needing > > > > > > > > > > > to make a new ProcessorContext kind of forced my hand. > > > > > > > > > > > > > > > > > > > > > > I see you've called me out about the ChangeLogging stores > > > > > > > > > > > :) > > > > > > > > > > > In fact, I think these are the main/only reason that > > > > > > > > > > > stores > > > > > > > > > > > might really need to invoke "forward()". My secret plan > > > > > > > > > > > was > > > > > > > > > > > to cheat and either accomplish change-logging by a > > > > > > > > > > > different > > > > > > > > > > > mechanism than implementing the store interface, or by > > > > > > > > > > > just > > > > > > > > > > > breaking encapsulation to sneak the "real" > > > > > > > > > > > ProcessorContext > > > > > > > > > > > into the ChangeLogging stores. But those are all > > > > > > > > > > > implementation details. I think the key question is > > > > > > > > > > > whether > > > > > > > > > > > anyone else has a store implementation that needs to call > > > > > > > > > > > "forward()". It's not what you mentioned, but since you > > > > > > > > > > > spoke up, I'll just ask: if you have a use case for > > > > > > > > > > > calling > > > > > > > > > > > "forward()" in a store, please share it. > > > > > > > > > > > > > > > > > > > > > > Regarding the other record-specific context methods, I > > > > > > > > > > > think > > > > > > > > > > > you have a good point, but I also can't quite wrap my head > > > > > > > > > > > around how we can actually guarantee it to work in > > > > > > > > > > > general. > > > > > > > > > > > For example, the case you cited, where the implementation > > > > > > > > > > > of > > > > > > > > > > > `KeyValueStore#put(key, value)` uses the context to > > > > > > > > > > > augment > > > > > > > > > > > the record with timestamp information. This relies on the > > > > > > > > > > > assumption that you would only call "put()" from inside a > > > > > > > > > > > `Processor#process(key, value)` call in which the record > > > > > > > > > > > being processed is the same record that you're trying to > > > > > > > > > > > put > > > > > > > > > > > into the store. > > > > > > > > > > > > > > > > > > > > > > If you were to call "put" from a punctuator, or do a > > > > > > > > > > > `range()` query and then update one of those records with > > > > > > > > > > > `put()`, you'd have a very subtle bug on your hands. Right > > > > > > > > > > > now, the Streams component that actually calls the > > > > > > > > > > > Processor > > > > > > > > > > > takes care to set the right record context before invoking > > > > > > > > > > > the method, and in the case of caching, etc., it also > > > > > > > > > > > takes > > > > > > > > > > > care to swap out the old context and keep it somewhere > > > > > > > > > > > safe. > > > > > > > > > > > But when it comes to public API Processors calling methods > > > > > > > > > > > on StateStores, there's no opportunity for any component > > > > > > > > > > > to > > > > > > > > > > > make sure the context is always correct. > > > > > > > > > > > > > > > > > > > > > > In the face of that situation, it seemed better to just > > > > > > > > > > > move > > > > > > > > > > > in the direction of a "normal" data store. I.e., when you > > > > > > > > > > > use a HashMap or RocksDB or other "state stores", you > > > > > > > > > > > don't > > > > > > > > > > > expect them to automatically know extra stuff about the > > > > > > > > > > > record you're storing. If you need them to know something, > > > > > > > > > > > you just put it in the value. > > > > > > > > > > > > > > > > > > > > > > All of that said, I'm just reasoning from first principles > > > > > > > > > > > here. To really know if this is a mistake or not, I need > > > > > > > > > > > to > > > > > > > > > > > be in your place. So please push back if you think what I > > > > > > > > > > > said is nonsense. My personal plan was to keep an eye out > > > > > > > > > > > during the period where the old API was still present, but > > > > > > > > > > > deprecated, to see if people were struggling to use the > > > > > > > > > > > new > > > > > > > > > > > API. If so, then we'd have a chance to address it before > > > > > > > > > > > dropping the old API. But it's even better if you can help > > > > > > > > > > > think it through now. > > > > > > > > > > > > > > > > > > > > > > It did also cross my mind to _not_ add the > > > > > > > > > > > StateStoreContext, but just to continue to punt on the > > > > > > > > > > > question by just dropping in the new ProcessorContext to > > > > > > > > > > > the > > > > > > > > > > > new init method. If StateStoreContext seems too bold, we > > > > > > > > > > > can > > > > > > > > > > > go that direction. But if we actually add some methods to > > > > > > > > > > > StateStoreContext, I'd like to be able to ensure they > > > > > > > > > > > would > > > > > > > > > > > be well defined. I think the current situation was more of > > > > > > > > > > > an oversight than a choice. > > > > > > > > > > > > > > > > > > > > > > Thanks again for your reply, > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul Whalen wrote: > > > > > > > > > > > > John, > > > > > > > > > > > > > > > > > > > > > > > > It's exciting to see this KIP head in this direction! > > > > > > > > > > > > In > > > > the > > > > > > > last > > > > > > > > > year > > > > > > > > > > > or > > > > > > > > > > > > so I've tried to sketch out some usability improvements > > > > > > > > > > > > for > > > > > > > custom > > > > > > > > > state > > > > > > > > > > > > stores, and I also ended up splitting out the > > > > > StateStoreContext > > > > > > > from > > > > > > > > > the > > > > > > > > > > > > ProcessorContext in an attempt to facilitate what I was > > > > > doing. I > > > > > > > > > sort of > > > > > > > > > > > > abandoned it when I realized how large the ideal change > > > > might > > > > > > > have > > > > > > > > > to be, > > > > > > > > > > > > but it's great to see that there is other interest in > > > > moving > > > > > in > > > > > > > this > > > > > > > > > > > > direction (from the folks that matter :) ). > > > > > > > > > > > > > > > > > > > > > > > > Having taken a stab at it myself, I have a > > > > > > > > > > > > comment/question > > > > > on > > > > > > > this > > > > > > > > > > > bullet > > > > > > > > > > > > about StateStoreContext: > > > > > > > > > > > > > > > > > > > > > > > > It does *not* include anything processor- or record- > > > > > specific, > > > > > > > like > > > > > > > > > > > > > `forward()` or any information about the "current" > > > > record, > > > > > > > which is > > > > > > > > > > > only a > > > > > > > > > > > > > well-defined in the context of the Processor. > > > > > > > > > > > > > Processors > > > > > > > process > > > > > > > > > one > > > > > > > > > > > record > > > > > > > > > > > > > at a time, but state stores may be used to store and > > > > fetch > > > > > many > > > > > > > > > > > records, so > > > > > > > > > > > > > there is no "current record". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I totally agree that record-specific or > > > > > > > > > > > > processor-specific > > > > > > > context > > > > > > > > > in a > > > > > > > > > > > > state store is often not well-defined and it would be > > > > > > > > > > > > good > > > > to > > > > > > > > > separate > > > > > > > > > > > that > > > > > > > > > > > > out, but sometimes it (at least record-specific > > > > > > > > > > > > context) is > > > > > > > actually > > > > > > > > > > > > useful, for example, passing the record's timestamp > > > > > > > > > > > > through > > > > > to > > > > > > > the > > > > > > > > > > > > underlying storage (or changelog topic): > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121 > > > > > > > > > > > > You could have the writer client of the state store pass > > > > this > > > > > > > > > through, > > > > > > > > > > > but > > > > > > > > > > > > it would be nice to be able to write state stores where > > > > > > > > > > > > the > > > > > > > client > > > > > > > > > did > > > > > > > > > > > not > > > > > > > > > > > > have this responsibility. I'm not sure if the solution > > > > > > > > > > > > is > > > > > to add > > > > > > > > > some > > > > > > > > > > > > things back to StateStoreContext, or make yet another > > > > context > > > > > > > that > > > > > > > > > > > > represents record-specific context while inside a state > > > > > store. > > > > > > > > > > > > Best, > > > > > > > > > > > > Paul > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John Roesler < > > > > > j...@vvcephei.org> > > > > > > > > > wrote: > > > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > > > > > > > I've been slowly pushing KIP-478 forward over the last > > > > > year, > > > > > > > > > > > > > and I'm happy to say that we're making good progress > > > > > > > > > > > > > now. > > > > > > > > > > > > > However, several issues with the original design have > > > > come > > > > > > > > > > > > > to light. > > > > > > > > > > > > > > > > > > > > > > > > > > The major changes: > > > > > > > > > > > > > > > > > > > > > > > > > > We discovered that the original plan of just adding > > > > generic > > > > > > > > > > > > > parameters to ProcessorContext was too disruptive, so > > > > > > > > > > > > > we > > > > > are > > > > > > > > > > > > > now adding a new api.ProcessorContext. > > > > > > > > > > > > > > > > > > > > > > > > > > That choice forces us to add a new StateStore.init > > > > > > > > > > > > > method > > > > > > > > > > > > > for the new context, but ProcessorContext really isn't > > > > > ideal > > > > > > > > > > > > > for state stores to begin with, so I'm proposing a new > > > > > > > > > > > > > StateStoreContext for this purpose. In a nutshell, > > > > > > > > > > > > > there > > > > > are > > > > > > > > > > > > > quite a few methods in ProcessorContext that actually > > > > > should > > > > > > > > > > > > > never be called from inside a StateStore. > > > > > > > > > > > > > > > > > > > > > > > > > > Also, since there is a new ProcessorContext > > > > > > > > > > > > > interface, we > > > > > > > > > > > > > need a new MockProcessorContext implementation in the > > > > test- > > > > > > > > > > > > > utils module. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The changeset for the KIP document is here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10 > > > > > > > > > > > > > And the KIP itself is here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API > > > > > > > > > > > > > If you have any concerns, please let me know! > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > >