Thanks for this thought, Matthias!

To be honest, it's bugged me quite a bit that _all_ the
record information hasn't been an argument to `process`. I
suppose I was trying to be conservative in this proposal,
but then again, if we're adding new Processor and
ProcessorContext interfaces, then this is the time to make
such a change.

To be unambiguous, I think this is what we're talking about:
ProcessorContext:
* applicationId
* taskId
* appConfigs
* appConfigsWithPrefix
* keySerde
* valueSerde
* stateDir
* metrics
* schedule
* commit
* forward

StateStoreContext:
* applicationId
* taskId
* appConfigs
* appConfigsWithPrefix
* keySerde
* valueSerde
* stateDir
* metrics
* register


RecordContext
* topic
* partition
* offset
* timestamp
* headers


Your proposal sounds good to me as-is. Just to cover the
bases, though, I'm wondering if we should push the idea just
a little farther. Instead of decomposing key,value,context,
we could just keep them all in one object, like this:

Record:
* key
* value
* topic
* partition
* offset
* timestamp
* headers

Then, we could have:
Processor#process(Record)
ProcessorContext#forward(Record, To)

Viewed from this perspective, a record has three properties
that people may specify in their processors: key, value, and
timestamp.

We could deprecate `To#withTimestamp` and enable people to
specify the timestamp along with the key and value when they
forward a record.

E.g.,
RecordBuilder toForward = RecordBuilder.copy(record)
toForward.withKey(newKey)
toForward.withValue(newValue)
toForward.withTimestamp(newTimestamp)
Record newRecord = toForward.build()
context.forward(newRecord, To.child("child1"))

Or, the more compact common case:
current:
 context.forward(key, "newValue")
proposed:
 context.forward(copy(record).withValue("newValue").build())


It's slightly more verbose, but also more extensible. This
would give us a clean path to add header support in PAPI as
well, simply by adding `withHeaders` in RecordBuilder.

It's also more symmetrical, since the recipient of `forward`
would just get the sent `Record`. Whereas today, the sender
puts the timestamp in `To`, but the recipient gets in in its
own `ProcessorContext`.

WDYT?
-John

On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote:
> I think separating the different contexts make sense.
> 
> In fact, we could even go one step further and remove the record context
> from the processor context completely and we add a third parameter to
> `process(key, value, recordContext)`. This would make it clear that the
> context is for the input record only and it's not possible to pass it to
> a `punctuate` callback.
> 
> For the stores and changelogging: I think there are two cases. (1) You
> use a plain key-value store. For this case, it seems you do not care
> about the timestamp and thus does not care what timestamp is set in the
> changelog records. (We can set anything we want, as it's not relevant at
> all -- the timestamp is ignored on read anyway.) (2) The other case is,
> that one does care about timestamps, and for this case should use
> TimestampedKeyValueStore. The passed timestamp will be set on the
> changelog records for this case.
> 
> Thus, for both cases, accessing the record context does not seems to be
> a requirement. And providing access to the processor context to, eg.,
> `forward()` or similar seems safe.
> 
> 
> -Matthias
> 
> On 9/10/20 7:25 PM, John Roesler wrote:
> > Thanks for the reply, Paul!
> > 
> > I certainly intend to make sure that the changelogging layer
> > continues to work the way it does now, by hook or by crook.
> > I think the easiest path for me is to just "cheat" and get
> > the real ProcessorContext into the ChangeLoggingStore
> > implementation somehow. I'll tag you on the PR when I create
> > it, so you have an opportunity to express a preference about
> > the implementation choice, and maybe even compile/test
> > against it to make sure your stuff still works.
> > 
> > Regarding this:
> > 
> > > we have an interest in making a state store with a richer
> > > way of querying its data (like perhaps getting all values
> > > associated with a secondary key), while still ultimately
> > > writing to the changelog topic for later restoration.
> > 
> > This is very intriguing to me. On the side, I've been
> > preparing a couple of ideas related to this topic. I don't
> > think I have a coherent enough thought to even express it in
> > a Jira right now, but when I do, I'll tag you on it also to
> > see what you think.
> > 
> > Whenever you're ready to share the usability improvement
> > ideas, I'm very interested to see what you've come up with.
> > 
> > Thanks,
> > -John
> > 
> > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
> > > > when you use a HashMap or RocksDB or other "state stores", you don't
> > > > expect them to automatically know extra stuff about the record you're
> > > > storing.
> > > 
> > > So, I don't think there is any reason we *can't* retain the record context
> > > > in the StateStoreContext, and if any users came along with a clear use 
> > > > case
> > > > I'd find that convincing.
> > > > 
> > > 
> > > I agree with the principle of being conservative with the 
> > > StateStoreContext
> > > API.  Regarding user expectations or a clear use case, the only
> > > counterpoint I would offer is that we sort of have that use case already,
> > > which is the example I gave of the change logging store using the
> > > timestamp.  I am curious if this functionality will be retained when using
> > > built in state stores, or will a low-level processor get a KeyValueStore
> > > that no longer writes to the changelog topic with the record's timestamp.
> > > While I personally don't care much about that functionality specifically, 
> > > I
> > > have a general desire for custom state stores to easily do the things that
> > > built in state stores do.
> > > 
> > > It genuinely did not occur to me that users might be looking up and/or
> > > > updating records of other keys from within a Processor.
> > > > 
> > > 
> > > I'm glad you said this Sophie, because it gives me an opportunity to say
> > > that this is actually a *huge* use case for my team.  The state store
> > > usability improvements I was referring to in my previous message were 
> > > about
> > > enabling the user to write custom stores while still easily hooking into
> > > the ability to write to a changelog topic.  I think that is technically
> > > possible now, but I don't think it's trivial.  Specifically, we have an
> > > interest in making a state store with a richer way of querying its data
> > > (like perhaps getting all values associated with a secondary key), while
> > > still ultimately writing to the changelog topic for later restoration.
> > > 
> > > We recognize that this use case throws away some of what kafka streams
> > > (especially the DSL) is good at - easy parallelizability by partitioning
> > > all processing by key - and that our business logic would completely fall
> > > apart if we were consuming from multi-partition topics with multiple
> > > consumers.  But we have found that using the low level processor API is
> > > good for the very simple stream processing primitives it provides: 
> > > handling
> > > the plumbing of consuming from multiple kafka topics and potentially
> > > updating persistent local state in a reliable way.  That in itself has
> > > proven to be a worthwhile programming model.
> > > 
> > > Since I got off track a bit, let me summarize: I don't particularly care
> > > about the record context being available to state store implementations,
> > > and I think this KIP is headed in the right direction in that regard.  But
> > > more generally, I wanted to express the importance of maintaining a
> > > powerful and flexible StateStore interface.
> > > 
> > > Thanks!
> > > Paul
> > > 
> > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman <sop...@confluent.io>
> > > wrote:
> > > 
> > > > Aha, I did misinterpret the example in your previous response regarding 
> > > > the
> > > > range query after all. I thought you just meant a time-range query 
> > > > inside a
> > > > punctuator. It genuinely did not occur to me that users might be 
> > > > looking up
> > > > and/or updating records of other keys from within a Processor. Sorry for
> > > > being closed minded
> > > > 
> > > > I won't drag out this discussion any further by asking whether that 
> > > > might
> > > > be
> > > > a valid use case or just a lurking bug in itself :)
> > > > 
> > > > Thanks for humoring me. The current proposal for KIP-478 sounds good to 
> > > > me
> > > > 
> > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler <vvcep...@apache.org> 
> > > > wrote:
> > > > 
> > > > > Ah, thanks Sophie,
> > > > > 
> > > > > I'm sorry for misinterpreting your resonse. Yes, we
> > > > > absolutely can and should clear the context before
> > > > > punctuating.
> > > > > 
> > > > > My secondary concern is maybe more far-fetched. I was
> > > > > thinking that inside process(key,value), a Processor might
> > > > > do a get/put of a _different_ key. Consider, for example,
> > > > > the way that Suppress processors work. When they get a
> > > > > record, they add it to the store and then do a range scan
> > > > > and possibly forward a _different_ record. Of course, this
> > > > > is an operation that is deeply coupled to the internals, and
> > > > > the Suppress processor accordingly actually does get access
> > > > > to the internal context so that it can set the context
> > > > > before forwarding.
> > > > > 
> > > > > Still, it seems like I've had a handful of conversations
> > > > > with people over the years in which they tell me they are
> > > > > using state stores in a way that transcends the "get and put
> > > > > the currently processing record" access pattern. I doubt
> > > > > that those folks would even have considered the possiblity
> > > > > that the currently processing record's _context_ could
> > > > > pollute their state store operations, as I myself never gave
> > > > > it a second thought until the current conversation began. In
> > > > > cases like that, we have actually set a trap for these
> > > > > people, and it seems better to dismantle the trap.
> > > > > 
> > > > > As you noted, really the only people who would be negatively
> > > > > impacted are people who implement their own state stores.
> > > > > These folks will get the deprecation warning and try to
> > > > > adapt their stores to the new interface. If they needed
> > > > > access to the record context, they would find it's now
> > > > > missing. They'd ask us about it, and we'd have the ability
> > > > > to explain the lurking bug that they have had in their
> > > > > stores all along, as well as the new recommended pattern
> > > > > (just pass everything you need in the value). If that's
> > > > > unsatisfying, _then_ we should consider amending the API.
> > > > > 
> > > > > Thanks,
> > > > > -John
> > > > > 
> > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman
> > > > > wrote:
> > > > > > > Regarding your first sentence, "...the processor would null
> > > > > > > out the record context...", this is not possible, since the
> > > > > > > processor doesn't have write access to the context. We could
> > > > > > > add it,
> > > > > > > 
> > > > > > 
> > > > > > Sorry, this was poorly phrased, I definitely did not mean to imply 
> > > > > > that
> > > > > we
> > > > > > should make the context modifiable by the Processors themselves. I
> > > > meant
> > > > > > this should be handled by the internal processing framework that 
> > > > > > deals
> > > > > with
> > > > > > passing records from one Processor to the next, setting the record
> > > > > context
> > > > > > when a new record is picked up, invoking the punctuators, etc. I
> > > > believe
> > > > > > this
> > > > > > all currently happens in the StreamTask? It already can and does
> > > > > overwrite
> > > > > > the record context as new records are processed, and is also
> > > > responsible
> > > > > > for calling the punctuators, so it doesn't seem like a huge leap to
> > > > just
> > > > > say
> > > > > > "null out the current record before punctuating"
> > > > > > 
> > > > > > To clarify, I was never advocating or even considering to give the
> > > > > > Processors
> > > > > > write access to the record context. Sorry if my last message (or 
> > > > > > all of
> > > > > > them)
> > > > > > was misleading. I just wanted to point out that the punctuator 
> > > > > > concern
> > > > is
> > > > > > orthogonal to the question of whether we should include the record
> > > > > context
> > > > > > in the StateStoreContext. It's definitely a real problem, but it's a
> > > > > > problem
> > > > > > that exists at the Processor level and not just the StateStore.
> > > > > > 
> > > > > > So, I don't think there is any reason we *can't* retain the record
> > > > > context
> > > > > > in the
> > > > > > StateStoreContext, and if any users came along with a clear use case
> > > > I'd
> > > > > > find
> > > > > > that convincing. In the absence of any examples, the conservative
> > > > > approach
> > > > > > sounds good to me.
> > > > > > 
> > > > > > If it turns out that someone did need the record context in their
> > > > custom
> > > > > > state
> > > > > > store, I'm sure they'll submit a politely worded bug report 
> > > > > > alerting us
> > > > > > that we
> > > > > > broke their application.
> > > > > > 
> > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler <vvcep...@apache.org>
> > > > > wrote:
> > > > > > > Thanks, Sophie,
> > > > > > > 
> > > > > > > Yes, now that you point it out, I can see that the record
> > > > > > > context itself should be nulled out by Streams before
> > > > > > > invoking punctuators. From that perspective, we don't need
> > > > > > > to think about the second-order problem of what's in the
> > > > > > > context for the state store when called from a punctuator.
> > > > > > > 
> > > > > > > Regarding your first sentence, "...the processor would null
> > > > > > > out the record context...", this is not possible, since the
> > > > > > > processor doesn't have write access to the context. We could
> > > > > > > add it, but then all kinds of strange effects would ensue
> > > > > > > when downstream processors execute but the context is empty,
> > > > > > > etc. Better to just let the framework manage the record
> > > > > > > context and keep it read-only for Processors.
> > > > > > > 
> > > > > > > Reading between the lines of your last reply, it sounds that
> > > > > > > the disconnect may just have been a mutual misunderstanding
> > > > > > > about whether or not Processors currently have access to set
> > > > > > > the record context. Since they do not, if we wanted to add
> > > > > > > the record context to StateStoreContext in a well-defined
> > > > > > > way, we'd also have to add the ability for Processors to
> > > > > > > manipulate it. But then, we're just creating a side-channel
> > > > > > > for Processors to pass some information in arguments to
> > > > > > > "put()" and other information implicitly through the
> > > > > > > context. It seems better just to go for a single channel for
> > > > > > > now.
> > > > > > > 
> > > > > > > It sounds like you're basically in favor of the conservative
> > > > > > > approach, and you just wanted to understand the blockers
> > > > > > > that I implied. Does my clarification make sense?
> > > > > > > 
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > > 
> > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie Blee-Goldman
> > > > > > > wrote:
> > > > > > > > I was just thinking that the processor would null out the record
> > > > > context
> > > > > > > > after it
> > > > > > > > finished processing the record, so I'm not sure I follow why 
> > > > > > > > this
> > > > > would
> > > > > > > not
> > > > > > > > be
> > > > > > > > possible? AFAIK we never call a punctuator in the middle of
> > > > > processing a
> > > > > > > > record through the topology, and even if we did, we still know 
> > > > > > > > when
> > > > > it is
> > > > > > > > about
> > > > > > > > to be called and could set it to null beforehand.
> > > > > > > > 
> > > > > > > > I'm not trying to advocate for it here, I'm in agreement that
> > > > > anything
> > > > > > > you
> > > > > > > > want
> > > > > > > > to access within the store can and should be accessed within the
> > > > > calling
> > > > > > > > Processor/Punctuator before reaching the store. The "we can 
> > > > > > > > always
> > > > > add it
> > > > > > > > later if necessary" argument is also pretty convincing. Just 
> > > > > > > > trying
> > > > > to
> > > > > > > > understand
> > > > > > > > why this wouldn't be possible.
> > > > > > > > 
> > > > > > > > FWIW, the question of "what is the current record in the context
> > > > of a
> > > > > > > > Punctuator"
> > > > > > > > exists independently of whether we want to add this to the
> > > > > > > StateStoreContext
> > > > > > > > or not. The full ProcessorContext, including the current record
> > > > > context,
> > > > > > > is
> > > > > > > > already available within a Punctuator, so removing the current
> > > > record
> > > > > > > > context
> > > > > > > > from the StateStoreContext does not solve the problem. Users 
> > > > > > > > can --
> > > > > and
> > > > > > > have
> > > > > > > > (see KAFKA-9584 
> > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-9584
> > > > > ;;)
> > > > > --
> > > > > > > hit
> > > > > > > > such subtle bugs without ever invoking a StateStore
> > > > > > > > from their punctuator.
> > > > > > > > 
> > > > > > > > Again, I think I do agree that we should leave the current 
> > > > > > > > record
> > > > > context
> > > > > > > > off of
> > > > > > > > the StateStoreContext, but I don't think the Punctuator argument
> > > > > against
> > > > > > > it
> > > > > > > > is
> > > > > > > > very convincing. It sounds to me like we need to disallow 
> > > > > > > > access to
> > > > > the
> > > > > > > > current
> > > > > > > > record context from within the Punctuator, independent of 
> > > > > > > > anything
> > > > > to do
> > > > > > > > with
> > > > > > > > state stores
> > > > > > > > 
> > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John Roesler 
> > > > > > > > <vvcep...@apache.org>
> > > > > > > wrote:
> > > > > > > > > Thanks for the thoughts, Sophie.
> > > > > > > > > 
> > > > > > > > > I agree that the extra information could be useful. My only
> > > > > concern is
> > > > > > > > > that it doesn’t seem like we can actually supply that extra
> > > > > information
> > > > > > > > > correctly. So, then we have a situation where the system 
> > > > > > > > > offers
> > > > > useful
> > > > > > > API
> > > > > > > > > calls that are only correct in a narrow range of use cases.
> > > > > Outside of
> > > > > > > > > those use cases, you get incorrect behavior.
> > > > > > > > > 
> > > > > > > > > If it were possible to null out the context before you put a
> > > > > document
> > > > > > > to
> > > > > > > > > which the context doesn’t apply, then the concern would be
> > > > > mitigated.
> > > > > > > But
> > > > > > > > > it would still be pretty weird from the perspective of the 
> > > > > > > > > store
> > > > > that
> > > > > > > > > sometimes the context is populated and other times, it’s null.
> > > > > > > > > 
> > > > > > > > > But that seems moot, since it doesn’t seem possible to null 
> > > > > > > > > out
> > > > the
> > > > > > > > > context. Only the Processor could know whether it’s about to 
> > > > > > > > > put
> > > > a
> > > > > > > document
> > > > > > > > > different from the context or not. And it would be 
> > > > > > > > > inappropriate
> > > > to
> > > > > > > offer a
> > > > > > > > > public ProcessorContext api to manage the record context.
> > > > > > > > > 
> > > > > > > > > Ultimately, it still seems like if you want to store headers, 
> > > > > > > > > you
> > > > > can
> > > > > > > > > store them explicitly, right? That doesn’t seem onerous to me,
> > > > and
> > > > > it
> > > > > > > kind
> > > > > > > > > of seems better than relying on undefined or asymmetrical
> > > > behavior
> > > > > in
> > > > > > > the
> > > > > > > > > store itself.
> > > > > > > > > 
> > > > > > > > > Anyway, I’m not saying that we couldn’t solve these problems.
> > > > Just
> > > > > > > that it
> > > > > > > > > seems a little that we can be conservative and avoid them for
> > > > now.
> > > > > If
> > > > > > > it
> > > > > > > > > turns out we really need to solve them, we can always do it
> > > > later.
> > > > > > > > > Thanks,
> > > > > > > > > John
> > > > > > > > > 
> > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote:
> > > > > > > > > > > If you were to call "put" from a punctuator, or do a
> > > > > > > > > > > `range()` query and then update one of those records with
> > > > > > > > > > > `put()`, you'd have a very subtle bug on your hands.
> > > > > > > > > > 
> > > > > > > > > > Can you elaborate on this a bit? I agree that the punctuator
> > > > > case is
> > > > > > > an
> > > > > > > > > > obvious exemption to the assumption that store invocations
> > > > always
> > > > > > > > > > have a corresponding "current record", but I don't 
> > > > > > > > > > understand
> > > > the
> > > > > > > > > > second example. Are you envisioning a scenario where the
> > > > #process
> > > > > > > > > > method performs a range query and then updates records? Or 
> > > > > > > > > > were
> > > > > > > > > > you just giving another example of the punctuator case?
> > > > > > > > > > 
> > > > > > > > > > I only bring it up because I agree that the current record
> > > > > > > information
> > > > > > > > > could
> > > > > > > > > > still be useful within the context of the store. As a 
> > > > > > > > > > non-user
> > > > my
> > > > > > > input
> > > > > > > > > on
> > > > > > > > > > this
> > > > > > > > > > definitely has limited value, but it just isn't striking me 
> > > > > > > > > > as
> > > > > > > obvious
> > > > > > > > > that
> > > > > > > > > > we
> > > > > > > > > > should remove access to the current record context from the
> > > > state
> > > > > > > stores.
> > > > > > > > > > If there is no current record, as in the  punctuator case, 
> > > > > > > > > > we
> > > > > should
> > > > > > > just
> > > > > > > > > > set
> > > > > > > > > > the record context to null (or Optional.empty, etc).
> > > > > > > > > > 
> > > > > > > > > > That said, the put() always has to come from somewhere, and
> > > > that
> > > > > > > > > > somewhere is always going to be either a Processor or a
> > > > > Punctuator,
> > > > > > > both
> > > > > > > > > > of which will still have access to the full context. So
> > > > > additional
> > > > > > > info
> > > > > > > > > > such as
> > > > > > > > > > the timestamp can and should probably be supplied to the 
> > > > > > > > > > store
> > > > > before
> > > > > > > > > > calling put(), rather than looked up by the store. But I can
> > > > see
> > > > > some
> > > > > > > > > other
> > > > > > > > > > things being useful, for example the current record's 
> > > > > > > > > > headers.
> > > > > Maybe
> > > > > > > > > if/when
> > > > > > > > > > we add better (or any) support for headers in state stores 
> > > > > > > > > > this
> > > > > will
> > > > > > > be
> > > > > > > > > > less true.
> > > > > > > > > > 
> > > > > > > > > > Of course as John has made clear, it's pretty hard to judge
> > > > > without
> > > > > > > > > > examples
> > > > > > > > > > and more insight as to what actually goes on within a custom
> > > > > state
> > > > > > > store
> > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John Roesler <
> > > > vvcep...@apache.org
> > > > > > > wrote:
> > > > > > > > > > > Hi Paul,
> > > > > > > > > > > 
> > > > > > > > > > > It's good to hear from you!
> > > > > > > > > > > 
> > > > > > > > > > > I'm glad you're in favor of the direction. Especially when
> > > > > > > > > > > it comes to public API and usability concens, I tend to
> > > > > > > > > > > think that "the folks who matter" are actually the folks 
> > > > > > > > > > > who
> > > > > > > > > > > have to use the APIs to accomplish real tasks. It can be
> > > > > > > > > > > hard for me to be sure I'm thinking clearly from that
> > > > > > > > > > > perspective.
> > > > > > > > > > > 
> > > > > > > > > > > Funny story, I also started down this road a couple of 
> > > > > > > > > > > times
> > > > > > > > > > > already and backed them out before the KIP because I was
> > > > > > > > > > > afraid of the scope of the proposal. Unfortunately, 
> > > > > > > > > > > needing
> > > > > > > > > > > to make a new ProcessorContext kind of forced my hand.
> > > > > > > > > > > 
> > > > > > > > > > > I see you've called me out about the ChangeLogging stores 
> > > > > > > > > > > :)
> > > > > > > > > > > In fact, I think these are the main/only reason that 
> > > > > > > > > > > stores
> > > > > > > > > > > might really need to invoke "forward()". My secret plan 
> > > > > > > > > > > was
> > > > > > > > > > > to cheat and either accomplish change-logging by a 
> > > > > > > > > > > different
> > > > > > > > > > > mechanism than implementing the store interface, or by 
> > > > > > > > > > > just
> > > > > > > > > > > breaking encapsulation to sneak the "real" 
> > > > > > > > > > > ProcessorContext
> > > > > > > > > > > into the ChangeLogging stores. But those are all
> > > > > > > > > > > implementation details. I think the key question is 
> > > > > > > > > > > whether
> > > > > > > > > > > anyone else has a store implementation that needs to call
> > > > > > > > > > > "forward()". It's not what you mentioned, but since you
> > > > > > > > > > > spoke up, I'll just ask: if you have a use case for 
> > > > > > > > > > > calling
> > > > > > > > > > > "forward()" in a store, please share it.
> > > > > > > > > > > 
> > > > > > > > > > > Regarding the other record-specific context methods, I 
> > > > > > > > > > > think
> > > > > > > > > > > you have a good point, but I also can't quite wrap my head
> > > > > > > > > > > around how we can actually guarantee it to work in 
> > > > > > > > > > > general.
> > > > > > > > > > > For example, the case you cited, where the implementation 
> > > > > > > > > > > of
> > > > > > > > > > > `KeyValueStore#put(key, value)` uses the context to 
> > > > > > > > > > > augment
> > > > > > > > > > > the record with timestamp information. This relies on the
> > > > > > > > > > > assumption that you would only call "put()" from inside a
> > > > > > > > > > > `Processor#process(key, value)` call in which the record
> > > > > > > > > > > being processed is the same record that you're trying to 
> > > > > > > > > > > put
> > > > > > > > > > > into the store.
> > > > > > > > > > > 
> > > > > > > > > > > If you were to call "put" from a punctuator, or do a
> > > > > > > > > > > `range()` query and then update one of those records with
> > > > > > > > > > > `put()`, you'd have a very subtle bug on your hands. Right
> > > > > > > > > > > now, the Streams component that actually calls the 
> > > > > > > > > > > Processor
> > > > > > > > > > > takes care to set the right record context before invoking
> > > > > > > > > > > the method, and in the case of caching, etc., it also 
> > > > > > > > > > > takes
> > > > > > > > > > > care to swap out the old context and keep it somewhere 
> > > > > > > > > > > safe.
> > > > > > > > > > > But when it comes to public API Processors calling methods
> > > > > > > > > > > on StateStores, there's no opportunity for any component 
> > > > > > > > > > > to
> > > > > > > > > > > make sure the context is always correct.
> > > > > > > > > > > 
> > > > > > > > > > > In the face of that situation, it seemed better to just 
> > > > > > > > > > > move
> > > > > > > > > > > in the direction of a "normal" data store. I.e., when you
> > > > > > > > > > > use a HashMap or RocksDB or other "state stores", you 
> > > > > > > > > > > don't
> > > > > > > > > > > expect them to automatically know extra stuff about the
> > > > > > > > > > > record you're storing. If you need them to know something,
> > > > > > > > > > > you just put it in the value.
> > > > > > > > > > > 
> > > > > > > > > > > All of that said, I'm just reasoning from first principles
> > > > > > > > > > > here. To really know if this is a mistake or not, I need 
> > > > > > > > > > > to
> > > > > > > > > > > be in your place. So please push back if you think what I
> > > > > > > > > > > said is nonsense. My personal plan was to keep an eye out
> > > > > > > > > > > during the period where the old API was still present, but
> > > > > > > > > > > deprecated, to see if people were struggling to use the 
> > > > > > > > > > > new
> > > > > > > > > > > API. If so, then we'd have a chance to address it before
> > > > > > > > > > > dropping the old API. But it's even better if you can help
> > > > > > > > > > > think it through now.
> > > > > > > > > > > 
> > > > > > > > > > > It did also cross my mind to _not_ add the
> > > > > > > > > > > StateStoreContext, but just to continue to punt on the
> > > > > > > > > > > question by just dropping in the new ProcessorContext to 
> > > > > > > > > > > the
> > > > > > > > > > > new init method. If StateStoreContext seems too bold, we 
> > > > > > > > > > > can
> > > > > > > > > > > go that direction. But if we actually add some methods to
> > > > > > > > > > > StateStoreContext, I'd like to be able to ensure they 
> > > > > > > > > > > would
> > > > > > > > > > > be well defined. I think the current situation was more of
> > > > > > > > > > > an oversight than a choice.
> > > > > > > > > > > 
> > > > > > > > > > > Thanks again for your reply,
> > > > > > > > > > > -John
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul Whalen wrote:
> > > > > > > > > > > > John,
> > > > > > > > > > > > 
> > > > > > > > > > > > It's exciting to see this KIP head in this direction!  
> > > > > > > > > > > > In
> > > > the
> > > > > > > last
> > > > > > > > > year
> > > > > > > > > > > or
> > > > > > > > > > > > so I've tried to sketch out some usability improvements 
> > > > > > > > > > > > for
> > > > > > > custom
> > > > > > > > > state
> > > > > > > > > > > > stores, and I also ended up splitting out the
> > > > > StateStoreContext
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > > > > ProcessorContext in an attempt to facilitate what I was
> > > > > doing.  I
> > > > > > > > > sort of
> > > > > > > > > > > > abandoned it when I realized how large the ideal change
> > > > might
> > > > > > > have
> > > > > > > > > to be,
> > > > > > > > > > > > but it's great to see that there is other interest in
> > > > moving
> > > > > in
> > > > > > > this
> > > > > > > > > > > > direction (from the folks that matter :) ).
> > > > > > > > > > > > 
> > > > > > > > > > > > Having taken a stab at it myself, I have a 
> > > > > > > > > > > > comment/question
> > > > > on
> > > > > > > this
> > > > > > > > > > > bullet
> > > > > > > > > > > > about StateStoreContext:
> > > > > > > > > > > > 
> > > > > > > > > > > > It does *not*  include anything processor- or record-
> > > > > specific,
> > > > > > > like
> > > > > > > > > > > > > `forward()` or any information about the "current"
> > > > record,
> > > > > > > which is
> > > > > > > > > > > only a
> > > > > > > > > > > > > well-defined in the context of the Processor. 
> > > > > > > > > > > > > Processors
> > > > > > > process
> > > > > > > > > one
> > > > > > > > > > > record
> > > > > > > > > > > > > at a time, but state stores may be used to store and
> > > > fetch
> > > > > many
> > > > > > > > > > > records, so
> > > > > > > > > > > > > there is no "current record".
> > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > I totally agree that record-specific or 
> > > > > > > > > > > > processor-specific
> > > > > > > context
> > > > > > > > > in a
> > > > > > > > > > > > state store is often not well-defined and it would be 
> > > > > > > > > > > > good
> > > > to
> > > > > > > > > separate
> > > > > > > > > > > that
> > > > > > > > > > > > out, but sometimes it (at least record-specific 
> > > > > > > > > > > > context) is
> > > > > > > actually
> > > > > > > > > > > > useful, for example, passing the record's timestamp 
> > > > > > > > > > > > through
> > > > > to
> > > > > > > the
> > > > > > > > > > > > underlying storage (or changelog topic):
> > > > > > > > > > > > 
> > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> > > > > > > > > > > > You could have the writer client of the state store pass
> > > > this
> > > > > > > > > through,
> > > > > > > > > > > but
> > > > > > > > > > > > it would be nice to be able to write state stores where 
> > > > > > > > > > > > the
> > > > > > > client
> > > > > > > > > did
> > > > > > > > > > > not
> > > > > > > > > > > > have this responsibility.  I'm not sure if the solution 
> > > > > > > > > > > > is
> > > > > to add
> > > > > > > > > some
> > > > > > > > > > > > things back to StateStoreContext, or make yet another
> > > > context
> > > > > > > that
> > > > > > > > > > > > represents record-specific context while inside a state
> > > > > store.
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Paul
> > > > > > > > > > > > 
> > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John Roesler <
> > > > > j...@vvcephei.org>
> > > > > > > > > wrote:
> > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I've been slowly pushing KIP-478 forward over the last
> > > > > year,
> > > > > > > > > > > > > and I'm happy to say that we're making good progress 
> > > > > > > > > > > > > now.
> > > > > > > > > > > > > However, several issues with the original design have
> > > > come
> > > > > > > > > > > > > to light.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > The major changes:
> > > > > > > > > > > > > 
> > > > > > > > > > > > > We discovered that the original plan of just adding
> > > > generic
> > > > > > > > > > > > > parameters to ProcessorContext was too disruptive, so 
> > > > > > > > > > > > > we
> > > > > are
> > > > > > > > > > > > > now adding a new api.ProcessorContext.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > That choice forces us to add a new StateStore.init 
> > > > > > > > > > > > > method
> > > > > > > > > > > > > for the new context, but ProcessorContext really isn't
> > > > > ideal
> > > > > > > > > > > > > for state stores to begin with, so I'm proposing a new
> > > > > > > > > > > > > StateStoreContext for this purpose. In a nutshell, 
> > > > > > > > > > > > > there
> > > > > are
> > > > > > > > > > > > > quite a few methods in ProcessorContext that actually
> > > > > should
> > > > > > > > > > > > > never be called from inside a StateStore.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Also, since there is a new ProcessorContext 
> > > > > > > > > > > > > interface, we
> > > > > > > > > > > > > need a new MockProcessorContext implementation in the
> > > > test-
> > > > > > > > > > > > > utils module.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > The changeset for the KIP document is here:
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > > > > > > > > > > > > And the KIP itself is here:
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > > > > > > > > > > > If you have any concerns, please let me know!
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > -John
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 

Reply via email to