>
> Does my reply address your concerns?

Yes; also, I definitely misread part of the proposal earlier and thought
you had put
the timestamp field in RecordMetadata. Sorry for not giving things a closer
look
before responding! I'm not sure my original message made much sense given
the misunderstanding, but thanks for responding anyway :P

Having given the proposal a second pass, I agree, it's very elegant. +1

On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org> wrote:

> Thanks for the reply, Sophie,
>
> I think I may have summarized too much in my prior reply.
>
> In the currently proposed KIP, any caller of forward() must
> supply a Record, which consists of:
> * key
> * value
> * timestamp
> * headers (with a convenience constructor that sets empty
> headers)
>
> These aren't what I was referring to as potentially being
> undefined downstream, since thanks to the introduction of
> Record, they are, as you're advocating, required to be
> defined everywhere, even when forwarding from a punctuator.
>
> So to be clear, the intent of this change is actually to
> _enforce_ that timestamp would never be undefined (which it
> currently can be). Also, since punctuators _are_ going to
> have to "make up" a timestamp going forward, we should note
> that the "punctuate" method currently passes in a good
> timestamp that they can use: for system-time punctuations,
> they receive the current system time, and for stream-time
> punctuations, they get the current stream time.
>
> The potentially undefined RecordMetadata only contains these
> fields:
> * topic
> * partition
> * offset
>
> These fields aren't required (or even used) in a Sink, and
> it doesn't seem like they would be important to many
> applications. Furthermore, it doesn't _seem_ like you'd even
> want to set these fields. They seem purely informational and
> only useful in the context when you are actually processing
> a real input record. It doesn't sound like you were asking
> for it, but just to put it on the record, I think if we were
> to require values for the metadata from punctuators, people
> would mostly just make up their own dummy values, to no
> one's benefit.
>
> I should also note that with the current
> Record/RecordMetadata split, we will have the freedom to
> move fields into the Record class (or even add new fields)
> if we want them to become "data" as opposed to "metadata" in
> the future.
>
> Thanks for your reply; I was similarly floored when I
> realized the true nature of the current situation. Does my
> reply address your concerns?
>
> Thanks,
> -John
>
> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
> wrote:
> > > However, the record metadata is only defined when the parent forwards
> > > while processing a
> >
> > real record, not when it calls forward from the punctuator
> >
> >
> > Can we take a step back for a second...why wouldn't you be required to
> set
> > the RecordContext
> > yourself when calling forward from a Punctuator? I think I agree with
> Paul
> > here, it seems kind of
> > absurd not to enforce that the RecordContext be present inside the
> > process() method.
> >
> > The original problem with Punctuators, as I understood it, was that all
> of
> > the RecordContext
> > fields were exposed automatically to both the Processor and any
> Punctuator,
> > due to being
> > direct methods on the ProcessorContext. We can't control which
> > ProcessorContext methods
> > someone will call from with a Punctuator vs from a Processor. The best we
> > could do was
> > set these "nonsense" fields to null when inside a Punctuator, or set them
> > to some dummy
> > values as you pointed out.
> >
> > But then you proposed the solution of a separate RecordContext which is
> not
> > attached to the
> > ProcessorContext at all. This seemed to solve the above problem very
> > neatly: we only pass
> > in the RecordContext to the process() method, so we don't have to worry
> > about people trying
> > to access these fields from within a Punctuator. The fields aren't
> > accessible unless they're
> > defined.
> >
> > So what happens when someone wants to forward something from within a
> > Punctuator? I
> > don't think it's reasonable to let the timestamp field be undefined,
> ever.
> > What if the Punctuator
> > forwards directly to a sink, or directly to some windowing logic. Are we
> > supposed to add
> > handling for the RecordContext == null case to every processor? Or are we
> > just going to
> > assume the implicit restriction that users will only forward records
> from a
> > Punctuator to
> > downstream processors that know how to handle and/or set the
> RecordContext
> > if it's
> > undefined. That seems to throw away a lot of the awesome safety added in
> > this KIP
> >
> > Apologies for the rant. But I feel pretty strongly that allowing to
> forward
> > records from a
> > Punctuator without a defined RecordContext would be asking for trouble.
> > Imo, if you
> > want to forward from a Punctuator, you need to store the info you need in
> > order to
> > set the timestamp, or make one up yourself
> >
> > (the one alternative I can think of here is that maybe we could pass in
> the
> > current
> > partition time, so users can at least put in a reasonable estimate for
> the
> > timestamp
> > that won't cause it to get dropped and won't potentially lurch the
> > streamtime far into
> > the future. This would be similar to what we do in the
> TimestampExtractor)
> >
> > On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org>
> wrote:
> >
> > > Oh, I guess one other thing I should have mentioned is that I’ve
> recently
> > > discovered that in cases where the context is undefined, we currently
> just
> > > fill in dummy values for the context. So there’s a good chance that
> real
> > > applications in use are depending on undefined context without even
> > > realizing it. What I’m hoping to do is just make the situation
> explicit and
> > > get rid of the dummy values.
> > >
> > > Thanks,
> > > John
> > >
> > > On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
> > > > Thanks for the review, Paul!
> > > >
> > > > I had read some of that debate before. There seems to be some subtext
> > > > there, because they advise against using Optional in cases like this,
> > > > but there doesn’t seem to be a specific reason why it’s
> inappropriate.
> > > > I got the impression they were just afraid that people would go
> > > > overboard and make everything Optional.
> > > >
> > > > I could also make two methods, but it seemed like it might be an
> > > > unfortunate way to handle the issue, since Processor is just about a
> > > > Function as-is, but the two-method approach would require people to
> > > > implement both methods.
> > > >
> > > > To your question, this is something that’s only recently became clear
> > > > to me. Imagine you have a parent processor that calls forward both
> from
> > > > process and a punctuator. The child will have process() invoked in
> both
> > > > cases, and won’t be able to distinguish them. However, the record
> > > > metadata is only defined when the parent forwards while processing a
> > > > real record, not when it calls forward from the punctuator.
> > > >
> > > > This is why I wanted to make the metadata Optional, to advertise that
> > > > the metadata might be undefined if any ancestor processor ever calls
> > > > forward from a punctuator. We could remove the Optional and instead
> > > > just document that the argument might be null.
> > > >
> > > > With that context in place, what’s your take?
> > > >
> > > > Thanks,
> > > > John
> > > >
> > > > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> > > > > Looks pretty good to me, though the Processor#process(Record,
> > > > > Optional<RecordMetadata>) signature caught my eye.  There's some
> > > debate
> > > > > (
> > > > >
> > >
> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
> > > )
> > > > > about whether to use Optionals in arguments, and while that's a
> bit of
> > > a
> > > > > religious debate in the abstract, it did make me wonder whether it
> > > makes
> > > > > sense in this specific case.  When is it actually not present?  I
> was
> > > > > under
> > > > > the impression that we should always have access to it in
> process(),
> > > and
> > > > > that the concern about metadata being undefined was about having
> > > access
> > > > > to
> > > > > record metadata in the ProcessorContext held for use inside a
> > > > > Punctuator.
> > > > >
> > > > > If that's not the case and it is truly optional in process(), is
> there
> > > an
> > > > > opportunity for an alternate interface for the cases when we don't
> get
> > > it,
> > > > > rather than force the branching on implementers of the interface?
> > > > >
> > > > > Apologies if I've missed something, I took a look at the PR and I
> > > didn't
> > > > > see any spots where I thought it would be empty.  Perhaps an
> example
> > > of a
> > > > > Punctuator using (and not using) the new API would clear things up.
> > > > >
> > > > > Best,
> > > > > Paul
> > > > >
> > > > > On Tue, Sep 29, 2020 at 4:10 PM John Roesler <vvcep...@apache.org>
> > > wrote:
> > > > > > Hello again, all,
> > > > > >
> > > > > > Thanks for the latest round of discussion. I've taken the
> > > > > > recent feedback and come up with an updated KIP that seems
> > > > > > actually quite a bit nicer than the prior proposal.
> > > > > >
> > > > > > The specific diff on the KIP is here:
> > > > > >
> > > > > >
> > >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
> > > > > > These changes are implemented in this POC PR:
> > > > > > https://github.com/apache/kafka/pull/9346
> > > > > >
> > > > > > The basic idea is that, building on the recent conversaion,
> > > > > > we would transition away from the current API where we get
> > > > > > only key/value in the process() method and other "data"
> > > > > > comes in the ProcessorContext along with the "metadata".
> > > > > >
> > > > > > Instead, we formalize what is "data" and what is "metadata",
> > > > > > and pass it all in to the process method:
> > > > > > Processor#process(Record, Optional<RecordMetadata>)
> > > > > >
> > > > > > Also, you forward the whole data class instead of mutating
> > > > > > the ProcessorContext fields and also calling forward:
> > > > > > ProcessorContext#forward(Record)
> > > > > >
> > > > > > The Record class itself ships with methods like
> > > > > > record#withValue(NewV newValue)
> > > > > > that make a shallow copy of the input Record, enabling
> > > > > > Processors to safely handle the record without polluting the
> > > > > > context of their parents and siblings.
> > > > > >
> > > > > > This proposal has a number of key benefits:
> > > > > > * As we've discovered in KAFKA-9584, it's unsafe to mutate
> > > > > > the Headers via the ProcessorContext. This proposal offers a
> > > > > > way to safely forward changes only to downstream processors.
> > > > > > * The new API has symmetry (each processor's input is the
> > > > > > output of its parent processor)
> > > > > > * The API makes clear that the record metadata isn't always
> > > > > > defined (for example, in a punctuation, there is no current
> > > > > > topic/partition/offset)
> > > > > > * The API enables punctuators to forward well defined
> > > > > > headers downstream, which is currently not possible.
> > > > > >
> > > > > > Unless their are objections, I'll go ahead and re-finalize
> > > > > > this KIP and update that PR to a mergeable state.
> > > > > >
> > > > > > Thanks, all,
> > > > > > -John
> > > > > >
> > > > > >
> > > > > > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> > > > > > > Interesting proposal. However, I am not totally convinced,
> because
> > > I see
> > > > > > > a fundamental difference between "data" and "metadata".
> > > > > > >
> > > > > > > Topic/partition/offset are "metadata" in the strong sense and
> they
> > > are
> > > > > > > immutable.
> > > > > > >
> > > > > > > On the other hand there is "primary" data like key and value,
> as
> > > well as
> > > > > > > "secondary" data like timestamp and headers. The issue seems
> that
> > > we
> > > > > > > treat "secondary data" more like metadata atm?
> > > > > > >
> > > > > > > Thus, promoting timestamp and headers into a first class
> citizen
> > > roll
> > > > > > > make sense to me (my original proposal about `RecordContext`
> would
> > > still
> > > > > > > fall short with this regard). However, putting both (data and
> > > metadata)
> > > > > > > into a `Record` abstraction might go too far?
> > > > > > >
> > > > > > > I am also a little bit concerned about `Record.copy()` because
> it
> > > might
> > > > > > > be a trap: Users might assume it does a full deep copy of the
> > > record,
> > > > > > > however, it would not. It would only create a new `Record`
> object
> > > as
> > > > > > > wrapper that points to the same key/value/header objects as the
> > > input
> > > > > > > record.
> > > > > > >
> > > > > > > With the current `context.forward(key, value)` we don't have
> this
> > > "deep
> > > > > > > copy" issue -- it's pretty clear what is happening.
> > > > > > >
> > > > > > > Instead of `To.all().withTimestamp()` we could also add
> > > > > > > `context.forward(key, value, timestamp)` etc (just wondering
> about
> > > the
> > > > > > > exposition in overload)?
> > > > > > >
> > > > > > > Also, `Record.withValue` etc sounds odd? Should a record not be
> > > > > > > immutable? So, we could have something like
> > > > > > >
> > > > > > >
> > >
> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
> > > > > > > But it looks rather verbose?
> > > > > > >
> > > > > > > The other question is of course, to what extend to we want to
> keep
> > > the
> > > > > > > distinction between "primary" and "secondary" data? To me,
> it's a
> > > > > > > question of easy of use?
> > > > > > >
> > > > > > > Just putting all this out to move the discussion forward. Don't
> > > have a
> > > > > > > concrete proposal atm.
> > > > > > >
> > > > > > >
> > > > > > > -Matthias
> > > > > > >
> > > > > > >
> > > > > > > On 9/14/20 9:24 AM, John Roesler wrote:
> > > > > > > > Thanks for this thought, Matthias!
> > > > > > > >
> > > > > > > > To be honest, it's bugged me quite a bit that _all_ the
> > > > > > > > record information hasn't been an argument to `process`. I
> > > > > > > > suppose I was trying to be conservative in this proposal,
> > > > > > > > but then again, if we're adding new Processor and
> > > > > > > > ProcessorContext interfaces, then this is the time to make
> > > > > > > > such a change.
> > > > > > > >
> > > > > > > > To be unambiguous, I think this is what we're talking about:
> > > > > > > > ProcessorContext:
> > > > > > > > * applicationId
> > > > > > > > * taskId
> > > > > > > > * appConfigs
> > > > > > > > * appConfigsWithPrefix
> > > > > > > > * keySerde
> > > > > > > > * valueSerde
> > > > > > > > * stateDir
> > > > > > > > * metrics
> > > > > > > > * schedule
> > > > > > > > * commit
> > > > > > > > * forward
> > > > > > > >
> > > > > > > > StateStoreContext:
> > > > > > > > * applicationId
> > > > > > > > * taskId
> > > > > > > > * appConfigs
> > > > > > > > * appConfigsWithPrefix
> > > > > > > > * keySerde
> > > > > > > > * valueSerde
> > > > > > > > * stateDir
> > > > > > > > * metrics
> > > > > > > > * register
> > > > > > > >
> > > > > > > >
> > > > > > > > RecordContext
> > > > > > > > * topic
> > > > > > > > * partition
> > > > > > > > * offset
> > > > > > > > * timestamp
> > > > > > > > * headers
> > > > > > > >
> > > > > > > >
> > > > > > > > Your proposal sounds good to me as-is. Just to cover the
> > > > > > > > bases, though, I'm wondering if we should push the idea just
> > > > > > > > a little farther. Instead of decomposing key,value,context,
> > > > > > > > we could just keep them all in one object, like this:
> > > > > > > >
> > > > > > > > Record:
> > > > > > > > * key
> > > > > > > > * value
> > > > > > > > * topic
> > > > > > > > * partition
> > > > > > > > * offset
> > > > > > > > * timestamp
> > > > > > > > * headers
> > > > > > > >
> > > > > > > > Then, we could have:
> > > > > > > > Processor#process(Record)
> > > > > > > > ProcessorContext#forward(Record, To)
> > > > > > > >
> > > > > > > > Viewed from this perspective, a record has three properties
> > > > > > > > that people may specify in their processors: key, value, and
> > > > > > > > timestamp.
> > > > > > > >
> > > > > > > > We could deprecate `To#withTimestamp` and enable people to
> > > > > > > > specify the timestamp along with the key and value when they
> > > > > > > > forward a record.
> > > > > > > >
> > > > > > > > E.g.,
> > > > > > > > RecordBuilder toForward = RecordBuilder.copy(record)
> > > > > > > > toForward.withKey(newKey)
> > > > > > > > toForward.withValue(newValue)
> > > > > > > > toForward.withTimestamp(newTimestamp)
> > > > > > > > Record newRecord = toForward.build()
> > > > > > > > context.forward(newRecord, To.child("child1"))
> > > > > > > >
> > > > > > > > Or, the more compact common case:
> > > > > > > > current:
> > > > > > > >  context.forward(key, "newValue")
> > > > > > > > proposed:
> > > > > > > >  context.forward(copy(record).withValue("newValue").build())
> > > > > > > >
> > > > > > > >
> > > > > > > > It's slightly more verbose, but also more extensible. This
> > > > > > > > would give us a clean path to add header support in PAPI as
> > > > > > > > well, simply by adding `withHeaders` in RecordBuilder.
> > > > > > > >
> > > > > > > > It's also more symmetrical, since the recipient of `forward`
> > > > > > > > would just get the sent `Record`. Whereas today, the sender
> > > > > > > > puts the timestamp in `To`, but the recipient gets in in its
> > > > > > > > own `ProcessorContext`.
> > > > > > > >
> > > > > > > > WDYT?
> > > > > > > > -John
> > > > > > > >
> > > > > > > > On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote:
> > > > > > > > > I think separating the different contexts make sense.
> > > > > > > > >
> > > > > > > > > In fact, we could even go one step further and remove the
> > > record
> > > > > > context
> > > > > > > > > from the processor context completely and we add a third
> > > parameter to
> > > > > > > > > `process(key, value, recordContext)`. This would make it
> clear
> > > that
> > > > > > the
> > > > > > > > > context is for the input record only and it's not possible
> to
> > > pass
> > > > > > it to
> > > > > > > > > a `punctuate` callback.
> > > > > > > > >
> > > > > > > > > For the stores and changelogging: I think there are two
> cases.
> > > (1)
> > > > > > You
> > > > > > > > > use a plain key-value store. For this case, it seems you do
> > > not care
> > > > > > > > > about the timestamp and thus does not care what timestamp
> is
> > > set in
> > > > > > the
> > > > > > > > > changelog records. (We can set anything we want, as it's
> not
> > > > > > relevant at
> > > > > > > > > all -- the timestamp is ignored on read anyway.) (2) The
> other
> > > case
> > > > > > is,
> > > > > > > > > that one does care about timestamps, and for this case
> should
> > > use
> > > > > > > > > TimestampedKeyValueStore. The passed timestamp will be set
> on
> > > the
> > > > > > > > > changelog records for this case.
> > > > > > > > >
> > > > > > > > > Thus, for both cases, accessing the record context does not
> > > seems to
> > > > > > be
> > > > > > > > > a requirement. And providing access to the processor
> context
> > > to, eg.,
> > > > > > > > > `forward()` or similar seems safe.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > -Matthias
> > > > > > > > >
> > > > > > > > > On 9/10/20 7:25 PM, John Roesler wrote:
> > > > > > > > > > Thanks for the reply, Paul!
> > > > > > > > > >
> > > > > > > > > > I certainly intend to make sure that the changelogging
> layer
> > > > > > > > > > continues to work the way it does now, by hook or by
> crook.
> > > > > > > > > > I think the easiest path for me is to just "cheat" and
> get
> > > > > > > > > > the real ProcessorContext into the ChangeLoggingStore
> > > > > > > > > > implementation somehow. I'll tag you on the PR when I
> create
> > > > > > > > > > it, so you have an opportunity to express a preference
> about
> > > > > > > > > > the implementation choice, and maybe even compile/test
> > > > > > > > > > against it to make sure your stuff still works.
> > > > > > > > > >
> > > > > > > > > > Regarding this:
> > > > > > > > > >
> > > > > > > > > > > we have an interest in making a state store with a
> richer
> > > > > > > > > > > way of querying its data (like perhaps getting all
> values
> > > > > > > > > > > associated with a secondary key), while still
> ultimately
> > > > > > > > > > > writing to the changelog topic for later restoration.
> > > > > > > > > >
> > > > > > > > > > This is very intriguing to me. On the side, I've been
> > > > > > > > > > preparing a couple of ideas related to this topic. I
> don't
> > > > > > > > > > think I have a coherent enough thought to even express
> it in
> > > > > > > > > > a Jira right now, but when I do, I'll tag you on it also
> to
> > > > > > > > > > see what you think.
> > > > > > > > > >
> > > > > > > > > > Whenever you're ready to share the usability improvement
> > > > > > > > > > ideas, I'm very interested to see what you've come up
> with.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > -John
> > > > > > > > > >
> > > > > > > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
> > > > > > > > > > > > when you use a HashMap or RocksDB or other "state
> > > stores", you
> > > > > > don't
> > > > > > > > > > > > expect them to automatically know extra stuff about
> the
> > > record
> > > > > > you're
> > > > > > > > > > > > storing.
> > > > > > > > > > >
> > > > > > > > > > > So, I don't think there is any reason we *can't*
> retain the
> > > > > > record context
> > > > > > > > > > > > in the StateStoreContext, and if any users came along
> > > with a
> > > > > > clear use case
> > > > > > > > > > > > I'd find that convincing.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > I agree with the principle of being conservative with
> the
> > > > > > StateStoreContext
> > > > > > > > > > > API.  Regarding user expectations or a clear use case,
> the
> > > only
> > > > > > > > > > > counterpoint I would offer is that we sort of have that
> > > use case
> > > > > > already,
> > > > > > > > > > > which is the example I gave of the change logging store
> > > using the
> > > > > > > > > > > timestamp.  I am curious if this functionality will be
> > > retained
> > > > > > when using
> > > > > > > > > > > built in state stores, or will a low-level processor
> get a
> > > > > > KeyValueStore
> > > > > > > > > > > that no longer writes to the changelog topic with the
> > > record's
> > > > > > timestamp.
> > > > > > > > > > > While I personally don't care much about that
> functionality
> > > > > > specifically, I
> > > > > > > > > > > have a general desire for custom state stores to
> easily do
> > > the
> > > > > > things that
> > > > > > > > > > > built in state stores do.
> > > > > > > > > > >
> > > > > > > > > > > It genuinely did not occur to me that users might be
> > > looking up
> > > > > > and/or
> > > > > > > > > > > > updating records of other keys from within a
> Processor.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > I'm glad you said this Sophie, because it gives me an
> > > > > > opportunity to say
> > > > > > > > > > > that this is actually a *huge* use case for my team.
> The
> > > state
> > > > > > store
> > > > > > > > > > > usability improvements I was referring to in my
> previous
> > > message
> > > > > > were about
> > > > > > > > > > > enabling the user to write custom stores while still
> easily
> > > > > > hooking into
> > > > > > > > > > > the ability to write to a changelog topic.  I think
> that is
> > > > > > technically
> > > > > > > > > > > possible now, but I don't think it's trivial.
> > > Specifically, we
> > > > > > have an
> > > > > > > > > > > interest in making a state store with a richer way of
> > > querying
> > > > > > its data
> > > > > > > > > > > (like perhaps getting all values associated with a
> > > secondary
> > > > > > key), while
> > > > > > > > > > > still ultimately writing to the changelog topic for
> later
> > > > > > restoration.
> > > > > > > > > > > We recognize that this use case throws away some of
> what
> > > kafka
> > > > > > streams
> > > > > > > > > > > (especially the DSL) is good at - easy
> parallelizability by
> > > > > > partitioning
> > > > > > > > > > > all processing by key - and that our business logic
> would
> > > > > > completely fall
> > > > > > > > > > > apart if we were consuming from multi-partition topics
> with
> > > > > > multiple
> > > > > > > > > > > consumers.  But we have found that using the low level
> > > processor
> > > > > > API is
> > > > > > > > > > > good for the very simple stream processing primitives
> it
> > > > > > provides: handling
> > > > > > > > > > > the plumbing of consuming from multiple kafka topics
> and
> > > > > > potentially
> > > > > > > > > > > updating persistent local state in a reliable way.
> That in
> > > > > > itself has
> > > > > > > > > > > proven to be a worthwhile programming model.
> > > > > > > > > > >
> > > > > > > > > > > Since I got off track a bit, let me summarize: I don't
> > > > > > particularly care
> > > > > > > > > > > about the record context being available to state store
> > > > > > implementations,
> > > > > > > > > > > and I think this KIP is headed in the right direction
> in
> > > that
> > > > > > regard.  But
> > > > > > > > > > > more generally, I wanted to express the importance of
> > > > > > maintaining a
> > > > > > > > > > > powerful and flexible StateStore interface.
> > > > > > > > > > >
> > > > > > > > > > > Thanks!
> > > > > > > > > > > Paul
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman <
> > > > > > sop...@confluent.io>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Aha, I did misinterpret the example in your previous
> > > response
> > > > > > regarding the
> > > > > > > > > > > > range query after all. I thought you just meant a
> > > time-range
> > > > > > query inside a
> > > > > > > > > > > > punctuator. It genuinely did not occur to me that
> users
> > > might
> > > > > > be looking up
> > > > > > > > > > > > and/or updating records of other keys from within a
> > > Processor.
> > > > > > Sorry for
> > > > > > > > > > > > being closed minded
> > > > > > > > > > > >
> > > > > > > > > > > > I won't drag out this discussion any further by
> asking
> > > whether
> > > > > > that might
> > > > > > > > > > > > be
> > > > > > > > > > > > a valid use case or just a lurking bug in itself :)
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for humoring me. The current proposal for
> KIP-478
> > > > > > sounds good to me
> > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler <
> > > > > > vvcep...@apache.org> wrote:
> > > > > > > > > > > > > Ah, thanks Sophie,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I'm sorry for misinterpreting your resonse. Yes, we
> > > > > > > > > > > > > absolutely can and should clear the context before
> > > > > > > > > > > > > punctuating.
> > > > > > > > > > > > >
> > > > > > > > > > > > > My secondary concern is maybe more far-fetched. I
> was
> > > > > > > > > > > > > thinking that inside process(key,value), a
> Processor
> > > might
> > > > > > > > > > > > > do a get/put of a _different_ key. Consider, for
> > > example,
> > > > > > > > > > > > > the way that Suppress processors work. When they
> get a
> > > > > > > > > > > > > record, they add it to the store and then do a
> range
> > > scan
> > > > > > > > > > > > > and possibly forward a _different_ record. Of
> course,
> > > this
> > > > > > > > > > > > > is an operation that is deeply coupled to the
> > > internals, and
> > > > > > > > > > > > > the Suppress processor accordingly actually does
> get
> > > access
> > > > > > > > > > > > > to the internal context so that it can set the
> context
> > > > > > > > > > > > > before forwarding.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Still, it seems like I've had a handful of
> > > conversations
> > > > > > > > > > > > > with people over the years in which they tell me
> they
> > > are
> > > > > > > > > > > > > using state stores in a way that transcends the
> "get
> > > and put
> > > > > > > > > > > > > the currently processing record" access pattern. I
> > > doubt
> > > > > > > > > > > > > that those folks would even have considered the
> > > possiblity
> > > > > > > > > > > > > that the currently processing record's _context_
> could
> > > > > > > > > > > > > pollute their state store operations, as I myself
> > > never gave
> > > > > > > > > > > > > it a second thought until the current conversation
> > > began. In
> > > > > > > > > > > > > cases like that, we have actually set a trap for
> these
> > > > > > > > > > > > > people, and it seems better to dismantle the trap.
> > > > > > > > > > > > >
> > > > > > > > > > > > > As you noted, really the only people who would be
> > > negatively
> > > > > > > > > > > > > impacted are people who implement their own state
> > > stores.
> > > > > > > > > > > > > These folks will get the deprecation warning and
> try to
> > > > > > > > > > > > > adapt their stores to the new interface. If they
> needed
> > > > > > > > > > > > > access to the record context, they would find it's
> now
> > > > > > > > > > > > > missing. They'd ask us about it, and we'd have the
> > > ability
> > > > > > > > > > > > > to explain the lurking bug that they have had in
> their
> > > > > > > > > > > > > stores all along, as well as the new recommended
> > > pattern
> > > > > > > > > > > > > (just pass everything you need in the value). If
> that's
> > > > > > > > > > > > > unsatisfying, _then_ we should consider amending
> the
> > > API.
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > -John
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie
> Blee-Goldman
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > Regarding your first sentence, "...the
> processor
> > > would
> > > > > > null
> > > > > > > > > > > > > > > out the record context...", this is not
> possible,
> > > since
> > > > > > the
> > > > > > > > > > > > > > > processor doesn't have write access to the
> > > context. We
> > > > > > could
> > > > > > > > > > > > > > > add it,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Sorry, this was poorly phrased, I definitely did
> not
> > > mean
> > > > > > to imply that
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > should make the context modifiable by the
> Processors
> > > > > > themselves. I
> > > > > > > > > > > > meant
> > > > > > > > > > > > > > this should be handled by the internal processing
> > > > > > framework that deals
> > > > > > > > > > > > > with
> > > > > > > > > > > > > > passing records from one Processor to the next,
> > > setting
> > > > > > the record
> > > > > > > > > > > > > context
> > > > > > > > > > > > > > when a new record is picked up, invoking the
> > > punctuators,
> > > > > > etc. I
> > > > > > > > > > > > believe
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > all currently happens in the StreamTask? It
> already
> > > can
> > > > > > and does
> > > > > > > > > > > > > overwrite
> > > > > > > > > > > > > > the record context as new records are processed,
> and
> > > is
> > > > > > also
> > > > > > > > > > > > responsible
> > > > > > > > > > > > > > for calling the punctuators, so it doesn't seem
> like
> > > a
> > > > > > huge leap to
> > > > > > > > > > > > just
> > > > > > > > > > > > > say
> > > > > > > > > > > > > > "null out the current record before punctuating"
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > To clarify, I was never advocating or even
> > > considering to
> > > > > > give the
> > > > > > > > > > > > > > Processors
> > > > > > > > > > > > > > write access to the record context. Sorry if my
> last
> > > > > > message (or all of
> > > > > > > > > > > > > > them)
> > > > > > > > > > > > > > was misleading. I just wanted to point out that
> the
> > > > > > punctuator concern
> > > > > > > > > > > > is
> > > > > > > > > > > > > > orthogonal to the question of whether we should
> > > include
> > > > > > the record
> > > > > > > > > > > > > context
> > > > > > > > > > > > > > in the StateStoreContext. It's definitely a real
> > > problem,
> > > > > > but it's a
> > > > > > > > > > > > > > problem
> > > > > > > > > > > > > > that exists at the Processor level and not just
> the
> > > > > > StateStore.
> > > > > > > > > > > > > > So, I don't think there is any reason we *can't*
> > > retain
> > > > > > the record
> > > > > > > > > > > > > context
> > > > > > > > > > > > > > in the
> > > > > > > > > > > > > > StateStoreContext, and if any users came along
> with a
> > > > > > clear use case
> > > > > > > > > > > > I'd
> > > > > > > > > > > > > > find
> > > > > > > > > > > > > > that convincing. In the absence of any examples,
> the
> > > > > > conservative
> > > > > > > > > > > > > approach
> > > > > > > > > > > > > > sounds good to me.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > If it turns out that someone did need the record
> > > context
> > > > > > in their
> > > > > > > > > > > > custom
> > > > > > > > > > > > > > state
> > > > > > > > > > > > > > store, I'm sure they'll submit a politely worded
> bug
> > > > > > report alerting us
> > > > > > > > > > > > > > that we
> > > > > > > > > > > > > > broke their application.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler <
> > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > Thanks, Sophie,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Yes, now that you point it out, I can see that
> the
> > > record
> > > > > > > > > > > > > > > context itself should be nulled out by Streams
> > > before
> > > > > > > > > > > > > > > invoking punctuators. From that perspective, we
> > > don't
> > > > > > need
> > > > > > > > > > > > > > > to think about the second-order problem of
> what's
> > > in the
> > > > > > > > > > > > > > > context for the state store when called from a
> > > > > > punctuator.
> > > > > > > > > > > > > > > Regarding your first sentence, "...the
> processor
> > > would
> > > > > > null
> > > > > > > > > > > > > > > out the record context...", this is not
> possible,
> > > since
> > > > > > the
> > > > > > > > > > > > > > > processor doesn't have write access to the
> > > context. We
> > > > > > could
> > > > > > > > > > > > > > > add it, but then all kinds of strange effects
> > > would ensue
> > > > > > > > > > > > > > > when downstream processors execute but the
> context
> > > is
> > > > > > empty,
> > > > > > > > > > > > > > > etc. Better to just let the framework manage
> the
> > > record
> > > > > > > > > > > > > > > context and keep it read-only for Processors.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Reading between the lines of your last reply,
> it
> > > sounds
> > > > > > that
> > > > > > > > > > > > > > > the disconnect may just have been a mutual
> > > > > > misunderstanding
> > > > > > > > > > > > > > > about whether or not Processors currently have
> > > access to
> > > > > > set
> > > > > > > > > > > > > > > the record context. Since they do not, if we
> > > wanted to
> > > > > > add
> > > > > > > > > > > > > > > the record context to StateStoreContext in a
> > > well-defined
> > > > > > > > > > > > > > > way, we'd also have to add the ability for
> > > Processors to
> > > > > > > > > > > > > > > manipulate it. But then, we're just creating a
> > > > > > side-channel
> > > > > > > > > > > > > > > for Processors to pass some information in
> > > arguments to
> > > > > > > > > > > > > > > "put()" and other information implicitly
> through
> > > the
> > > > > > > > > > > > > > > context. It seems better just to go for a
> single
> > > channel
> > > > > > for
> > > > > > > > > > > > > > > now.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > It sounds like you're basically in favor of the
> > > > > > conservative
> > > > > > > > > > > > > > > approach, and you just wanted to understand the
> > > blockers
> > > > > > > > > > > > > > > that I implied. Does my clarification make
> sense?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie
> > > Blee-Goldman
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > I was just thinking that the processor would
> > > null out
> > > > > > the record
> > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > after it
> > > > > > > > > > > > > > > > finished processing the record, so I'm not
> sure I
> > > > > > follow why this
> > > > > > > > > > > > > would
> > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > possible? AFAIK we never call a punctuator
> in the
> > > > > > middle of
> > > > > > > > > > > > > processing a
> > > > > > > > > > > > > > > > record through the topology, and even if we
> did,
> > > we
> > > > > > still know when
> > > > > > > > > > > > > it is
> > > > > > > > > > > > > > > > about
> > > > > > > > > > > > > > > > to be called and could set it to null
> beforehand.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I'm not trying to advocate for it here, I'm
> in
> > > > > > agreement that
> > > > > > > > > > > > > anything
> > > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > want
> > > > > > > > > > > > > > > > to access within the store can and should be
> > > accessed
> > > > > > within the
> > > > > > > > > > > > > calling
> > > > > > > > > > > > > > > > Processor/Punctuator before reaching the
> store.
> > > The
> > > > > > "we can always
> > > > > > > > > > > > > add it
> > > > > > > > > > > > > > > > later if necessary" argument is also pretty
> > > > > > convincing. Just trying
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > understand
> > > > > > > > > > > > > > > > why this wouldn't be possible.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > FWIW, the question of "what is the current
> > > record in
> > > > > > the context
> > > > > > > > > > > > of a
> > > > > > > > > > > > > > > > Punctuator"
> > > > > > > > > > > > > > > > exists independently of whether we want to
> add
> > > this to
> > > > > > the
> > > > > > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > > > or not. The full ProcessorContext, including
> the
> > > > > > current record
> > > > > > > > > > > > > context,
> > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > already available within a Punctuator, so
> > > removing the
> > > > > > current
> > > > > > > > > > > > record
> > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > from the StateStoreContext does not solve the
> > > problem.
> > > > > > Users can --
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > (see KAFKA-9584 <
> > > > > > https://issues.apache.org/jira/browse/KAFKA-9584
> > > > > > > > > > > > > ;;)
> > > > > > > > > > > > > --
> > > > > > > > > > > > > > > hit
> > > > > > > > > > > > > > > > such subtle bugs without ever invoking a
> > > StateStore
> > > > > > > > > > > > > > > > from their punctuator.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Again, I think I do agree that we should
> leave
> > > the
> > > > > > current record
> > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > off of
> > > > > > > > > > > > > > > > the StateStoreContext, but I don't think the
> > > > > > Punctuator argument
> > > > > > > > > > > > > against
> > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > very convincing. It sounds to me like we
> need to
> > > > > > disallow access to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > record context from within the Punctuator,
> > > independent
> > > > > > of anything
> > > > > > > > > > > > > to do
> > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > state stores
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John Roesler
> <
> > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > Thanks for the thoughts, Sophie.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I agree that the extra information could be
> > > useful.
> > > > > > My only
> > > > > > > > > > > > > concern is
> > > > > > > > > > > > > > > > > that it doesn’t seem like we can actually
> > > supply
> > > > > > that extra
> > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > > correctly. So, then we have a situation
> where
> > > the
> > > > > > system offers
> > > > > > > > > > > > > useful
> > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > > calls that are only correct in a narrow
> range
> > > of use
> > > > > > cases.
> > > > > > > > > > > > > Outside of
> > > > > > > > > > > > > > > > > those use cases, you get incorrect
> behavior.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > If it were possible to null out the context
> > > before
> > > > > > you put a
> > > > > > > > > > > > > document
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > which the context doesn’t apply, then the
> > > concern
> > > > > > would be
> > > > > > > > > > > > > mitigated.
> > > > > > > > > > > > > > > But
> > > > > > > > > > > > > > > > > it would still be pretty weird from the
> > > perspective
> > > > > > of the store
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > sometimes the context is populated and
> other
> > > times,
> > > > > > it’s null.
> > > > > > > > > > > > > > > > > But that seems moot, since it doesn’t seem
> > > possible
> > > > > > to null out
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > context. Only the Processor could know
> whether
> > > it’s
> > > > > > about to put
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > document
> > > > > > > > > > > > > > > > > different from the context or not. And it
> > > would be
> > > > > > inappropriate
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > offer a
> > > > > > > > > > > > > > > > > public ProcessorContext api to manage the
> > > record
> > > > > > context.
> > > > > > > > > > > > > > > > > Ultimately, it still seems like if you
> want to
> > > store
> > > > > > headers, you
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > store them explicitly, right? That doesn’t
> seem
> > > > > > onerous to me,
> > > > > > > > > > > > and
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > kind
> > > > > > > > > > > > > > > > > of seems better than relying on undefined
> or
> > > > > > asymmetrical
> > > > > > > > > > > > behavior
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > store itself.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Anyway, I’m not saying that we couldn’t
> solve
> > > these
> > > > > > problems.
> > > > > > > > > > > > Just
> > > > > > > > > > > > > > > that it
> > > > > > > > > > > > > > > > > seems a little that we can be conservative
> and
> > > avoid
> > > > > > them for
> > > > > > > > > > > > now.
> > > > > > > > > > > > > If
> > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > turns out we really need to solve them, we
> can
> > > > > > always do it
> > > > > > > > > > > > later.
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie
> > > Blee-Goldman
> > > > > > wrote:
> > > > > > > > > > > > > > > > > > > If you were to call "put" from a
> > > punctuator, or
> > > > > > do a
> > > > > > > > > > > > > > > > > > > `range()` query and then update one of
> > > those
> > > > > > records with
> > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle bug
> on
> > > your
> > > > > > hands.
> > > > > > > > > > > > > > > > > > Can you elaborate on this a bit? I agree
> > > that the
> > > > > > punctuator
> > > > > > > > > > > > > case is
> > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > obvious exemption to the assumption that
> > > store
> > > > > > invocations
> > > > > > > > > > > > always
> > > > > > > > > > > > > > > > > > have a corresponding "current record",
> but I
> > > don't
> > > > > > understand
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > second example. Are you envisioning a
> > > scenario
> > > > > > where the
> > > > > > > > > > > > #process
> > > > > > > > > > > > > > > > > > method performs a range query and then
> > > updates
> > > > > > records? Or were
> > > > > > > > > > > > > > > > > > you just giving another example of the
> > > punctuator
> > > > > > case?
> > > > > > > > > > > > > > > > > > I only bring it up because I agree that
> the
> > > > > > current record
> > > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > still be useful within the context of the
> > > store.
> > > > > > As a non-user
> > > > > > > > > > > > my
> > > > > > > > > > > > > > > input
> > > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > definitely has limited value, but it just
> > > isn't
> > > > > > striking me as
> > > > > > > > > > > > > > > obvious
> > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > should remove access to the current
> record
> > > context
> > > > > > from the
> > > > > > > > > > > > state
> > > > > > > > > > > > > > > stores.
> > > > > > > > > > > > > > > > > > If there is no current record, as in the
> > > > > > punctuator case, we
> > > > > > > > > > > > > should
> > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > set
> > > > > > > > > > > > > > > > > > the record context to null (or
> > > Optional.empty,
> > > > > > etc).
> > > > > > > > > > > > > > > > > > That said, the put() always has to come
> from
> > > > > > somewhere, and
> > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > somewhere is always going to be either a
> > > Processor
> > > > > > or a
> > > > > > > > > > > > > Punctuator,
> > > > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > > > of which will still have access to the
> full
> > > > > > context. So
> > > > > > > > > > > > > additional
> > > > > > > > > > > > > > > info
> > > > > > > > > > > > > > > > > > such as
> > > > > > > > > > > > > > > > > > the timestamp can and should probably be
> > > supplied
> > > > > > to the store
> > > > > > > > > > > > > before
> > > > > > > > > > > > > > > > > > calling put(), rather than looked up by
> the
> > > store.
> > > > > > But I can
> > > > > > > > > > > > see
> > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > things being useful, for example the
> current
> > > > > > record's headers.
> > > > > > > > > > > > > Maybe
> > > > > > > > > > > > > > > > > if/when
> > > > > > > > > > > > > > > > > > we add better (or any) support for
> headers in
> > > > > > state stores this
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > less true.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Of course as John has made clear, it's
> > > pretty hard
> > > > > > to judge
> > > > > > > > > > > > > without
> > > > > > > > > > > > > > > > > > examples
> > > > > > > > > > > > > > > > > > and more insight as to what actually
> goes on
> > > > > > within a custom
> > > > > > > > > > > > > state
> > > > > > > > > > > > > > > store
> > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John
> Roesler <
> > > > > > > > > > > > vvcep...@apache.org
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > Hi Paul,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > It's good to hear from you!
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I'm glad you're in favor of the
> direction.
> > > > > > Especially when
> > > > > > > > > > > > > > > > > > > it comes to public API and usability
> > > concens, I
> > > > > > tend to
> > > > > > > > > > > > > > > > > > > think that "the folks who matter" are
> > > actually
> > > > > > the folks who
> > > > > > > > > > > > > > > > > > > have to use the APIs to accomplish real
> > > tasks.
> > > > > > It can be
> > > > > > > > > > > > > > > > > > > hard for me to be sure I'm thinking
> > > clearly from
> > > > > > that
> > > > > > > > > > > > > > > > > > > perspective.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Funny story, I also started down this
> road
> > > a
> > > > > > couple of times
> > > > > > > > > > > > > > > > > > > already and backed them out before the
> KIP
> > > > > > because I was
> > > > > > > > > > > > > > > > > > > afraid of the scope of the proposal.
> > > > > > Unfortunately, needing
> > > > > > > > > > > > > > > > > > > to make a new ProcessorContext kind of
> > > forced my
> > > > > > hand.
> > > > > > > > > > > > > > > > > > > I see you've called me out about the
> > > > > > ChangeLogging stores :)
> > > > > > > > > > > > > > > > > > > In fact, I think these are the
> main/only
> > > reason
> > > > > > that stores
> > > > > > > > > > > > > > > > > > > might really need to invoke
> "forward()". My
> > > > > > secret plan was
> > > > > > > > > > > > > > > > > > > to cheat and either accomplish
> > > change-logging by
> > > > > > a different
> > > > > > > > > > > > > > > > > > > mechanism than implementing the store
> > > interface,
> > > > > > or by just
> > > > > > > > > > > > > > > > > > > breaking encapsulation to sneak the
> "real"
> > > > > > ProcessorContext
> > > > > > > > > > > > > > > > > > > into the ChangeLogging stores. But
> those
> > > are all
> > > > > > > > > > > > > > > > > > > implementation details. I think the key
> > > question
> > > > > > is whether
> > > > > > > > > > > > > > > > > > > anyone else has a store implementation
> that
> > > > > > needs to call
> > > > > > > > > > > > > > > > > > > "forward()". It's not what you
> mentioned,
> > > but
> > > > > > since you
> > > > > > > > > > > > > > > > > > > spoke up, I'll just ask: if you have a
> use
> > > case
> > > > > > for calling
> > > > > > > > > > > > > > > > > > > "forward()" in a store, please share
> it.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Regarding the other record-specific
> context
> > > > > > methods, I think
> > > > > > > > > > > > > > > > > > > you have a good point, but I also can't
> > > quite
> > > > > > wrap my head
> > > > > > > > > > > > > > > > > > > around how we can actually guarantee
> it to
> > > work
> > > > > > in general.
> > > > > > > > > > > > > > > > > > > For example, the case you cited, where
> the
> > > > > > implementation of
> > > > > > > > > > > > > > > > > > > `KeyValueStore#put(key, value)` uses
> the
> > > context
> > > > > > to augment
> > > > > > > > > > > > > > > > > > > the record with timestamp information.
> This
> > > > > > relies on the
> > > > > > > > > > > > > > > > > > > assumption that you would only call
> > > "put()" from
> > > > > > inside a
> > > > > > > > > > > > > > > > > > > `Processor#process(key, value)` call in
> > > which
> > > > > > the record
> > > > > > > > > > > > > > > > > > > being processed is the same record that
> > > you're
> > > > > > trying to put
> > > > > > > > > > > > > > > > > > > into the store.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > If you were to call "put" from a
> > > punctuator, or
> > > > > > do a
> > > > > > > > > > > > > > > > > > > `range()` query and then update one of
> > > those
> > > > > > records with
> > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle bug
> on
> > > your
> > > > > > hands. Right
> > > > > > > > > > > > > > > > > > > now, the Streams component that
> actually
> > > calls
> > > > > > the Processor
> > > > > > > > > > > > > > > > > > > takes care to set the right record
> context
> > > > > > before invoking
> > > > > > > > > > > > > > > > > > > the method, and in the case of caching,
> > > etc., it
> > > > > > also takes
> > > > > > > > > > > > > > > > > > > care to swap out the old context and
> keep
> > > it
> > > > > > somewhere safe.
> > > > > > > > > > > > > > > > > > > But when it comes to public API
> Processors
> > > > > > calling methods
> > > > > > > > > > > > > > > > > > > on StateStores, there's no opportunity
> for
> > > any
> > > > > > component to
> > > > > > > > > > > > > > > > > > > make sure the context is always
> correct.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > In the face of that situation, it
> seemed
> > > better
> > > > > > to just move
> > > > > > > > > > > > > > > > > > > in the direction of a "normal" data
> store.
> > > I.e.,
> > > > > > when you
> > > > > > > > > > > > > > > > > > > use a HashMap or RocksDB or other
> "state
> > > > > > stores", you don't
> > > > > > > > > > > > > > > > > > > expect them to automatically know extra
> > > stuff
> > > > > > about the
> > > > > > > > > > > > > > > > > > > record you're storing. If you need
> them to
> > > know
> > > > > > something,
> > > > > > > > > > > > > > > > > > > you just put it in the value.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > All of that said, I'm just reasoning
> from
> > > first
> > > > > > principles
> > > > > > > > > > > > > > > > > > > here. To really know if this is a
> mistake
> > > or
> > > > > > not, I need to
> > > > > > > > > > > > > > > > > > > be in your place. So please push back
> if
> > > you
> > > > > > think what I
> > > > > > > > > > > > > > > > > > > said is nonsense. My personal plan was
> to
> > > keep
> > > > > > an eye out
> > > > > > > > > > > > > > > > > > > during the period where the old API was
> > > still
> > > > > > present, but
> > > > > > > > > > > > > > > > > > > deprecated, to see if people were
> > > struggling to
> > > > > > use the new
> > > > > > > > > > > > > > > > > > > API. If so, then we'd have a chance to
> > > address
> > > > > > it before
> > > > > > > > > > > > > > > > > > > dropping the old API. But it's even
> better
> > > if
> > > > > > you can help
> > > > > > > > > > > > > > > > > > > think it through now.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > It did also cross my mind to _not_ add
> the
> > > > > > > > > > > > > > > > > > > StateStoreContext, but just to
> continue to
> > > punt
> > > > > > on the
> > > > > > > > > > > > > > > > > > > question by just dropping in the new
> > > > > > ProcessorContext to the
> > > > > > > > > > > > > > > > > > > new init method. If StateStoreContext
> > > seems too
> > > > > > bold, we can
> > > > > > > > > > > > > > > > > > > go that direction. But if we actually
> add
> > > some
> > > > > > methods to
> > > > > > > > > > > > > > > > > > > StateStoreContext, I'd like to be able
> to
> > > ensure
> > > > > > they would
> > > > > > > > > > > > > > > > > > > be well defined. I think the current
> > > situation
> > > > > > was more of
> > > > > > > > > > > > > > > > > > > an oversight than a choice.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks again for your reply,
> > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul
> > > Whalen
> > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > John,
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > It's exciting to see this KIP head in
> > > this
> > > > > > direction!  In
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > > year
> > > > > > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > > > > > so I've tried to sketch out some
> > > usability
> > > > > > improvements for
> > > > > > > > > > > > > > > custom
> > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > stores, and I also ended up splitting
> > > out the
> > > > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > > from
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > ProcessorContext in an attempt to
> > > facilitate
> > > > > > what I was
> > > > > > > > > > > > > doing.  I
> > > > > > > > > > > > > > > > > sort of
> > > > > > > > > > > > > > > > > > > > abandoned it when I realized how
> large
> > > the
> > > > > > ideal change
> > > > > > > > > > > > might
> > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > to be,
> > > > > > > > > > > > > > > > > > > > but it's great to see that there is
> other
> > > > > > interest in
> > > > > > > > > > > > moving
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > direction (from the folks that
> matter :)
> > > ).
> > > > > > > > > > > > > > > > > > > > Having taken a stab at it myself, I
> have
> > > a
> > > > > > comment/question
> > > > > > > > > > > > > on
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > bullet
> > > > > > > > > > > > > > > > > > > > about StateStoreContext:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > It does *not*  include anything
> > > processor- or
> > > > > > record-
> > > > > > > > > > > > > specific,
> > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > > > > > `forward()` or any information
> about
> > > the
> > > > > > "current"
> > > > > > > > > > > > record,
> > > > > > > > > > > > > > > which is
> > > > > > > > > > > > > > > > > > > only a
> > > > > > > > > > > > > > > > > > > > > well-defined in the context of the
> > > > > > Processor. Processors
> > > > > > > > > > > > > > > process
> > > > > > > > > > > > > > > > > one
> > > > > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > > > > at a time, but state stores may be
> > > used to
> > > > > > store and
> > > > > > > > > > > > fetch
> > > > > > > > > > > > > many
> > > > > > > > > > > > > > > > > > > records, so
> > > > > > > > > > > > > > > > > > > > > there is no "current record".
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > I totally agree that record-specific
> or
> > > > > > processor-specific
> > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > in a
> > > > > > > > > > > > > > > > > > > > state store is often not well-defined
> > > and it
> > > > > > would be good
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > separate
> > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > out, but sometimes it (at least
> > > > > > record-specific context) is
> > > > > > > > > > > > > > > actually
> > > > > > > > > > > > > > > > > > > > useful, for example, passing the
> record's
> > > > > > timestamp through
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > underlying storage (or changelog
> topic):
> > > > > > > > > > > > > > > > > > > >
> > >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> > > > > > > > > > > > > > > > > > > > You could have the writer client of
> the
> > > state
> > > > > > store pass
> > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > through,
> > > > > > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > > > > it would be nice to be able to write
> > > state
> > > > > > stores where the
> > > > > > > > > > > > > > > client
> > > > > > > > > > > > > > > > > did
> > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > have this responsibility.  I'm not
> sure
> > > if the
> > > > > > solution is
> > > > > > > > > > > > > to add
> > > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > > > things back to StateStoreContext, or
> > > make yet
> > > > > > another
> > > > > > > > > > > > context
> > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > represents record-specific context
> while
> > > > > > inside a state
> > > > > > > > > > > > > store.
> > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John
> > > Roesler <
> > > > > > > > > > > > > j...@vvcephei.org>
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > I've been slowly pushing KIP-478
> > > forward
> > > > > > over the last
> > > > > > > > > > > > > year,
> > > > > > > > > > > > > > > > > > > > > and I'm happy to say that we're
> making
> > > good
> > > > > > progress now.
> > > > > > > > > > > > > > > > > > > > > However, several issues with the
> > > original
> > > > > > design have
> > > > > > > > > > > > come
> > > > > > > > > > > > > > > > > > > > > to light.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > The major changes:
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > We discovered that the original
> plan
> > > of just
> > > > > > adding
> > > > > > > > > > > > generic
> > > > > > > > > > > > > > > > > > > > > parameters to ProcessorContext was
> too
> > > > > > disruptive, so we
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > now adding a new
> api.ProcessorContext.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > That choice forces us to add a new
> > > > > > StateStore.init method
> > > > > > > > > > > > > > > > > > > > > for the new context, but
> > > ProcessorContext
> > > > > > really isn't
> > > > > > > > > > > > > ideal
> > > > > > > > > > > > > > > > > > > > > for state stores to begin with, so
> I'm
> > > > > > proposing a new
> > > > > > > > > > > > > > > > > > > > > StateStoreContext for this
> purpose. In
> > > a
> > > > > > nutshell, there
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > quite a few methods in
> > > ProcessorContext that
> > > > > > actually
> > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > > > never be called from inside a
> > > StateStore.
> > > > > > > > > > > > > > > > > > > > > Also, since there is a new
> > > ProcessorContext
> > > > > > interface, we
> > > > > > > > > > > > > > > > > > > > > need a new MockProcessorContext
> > > > > > implementation in the
> > > > > > > > > > > > test-
> > > > > > > > > > > > > > > > > > > > > utils module.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > The changeset for the KIP document
> is
> > > here:
> > > > > > > > > > > > > > > > > > > > >
> > >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > > > > > > > > > > > > > > > > > > > > And the KIP itself is here:
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > > > > > > > > > > > > > > > > > > > If you have any concerns, please
> let
> > > me know!
> > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
>
>

Reply via email to