Aha, I did misinterpret the example in your previous response regarding the
range query after all. I thought you just meant a time-range query inside a
punctuator. It genuinely did not occur to me that users might be looking up
and/or updating records of other keys from within a Processor. Sorry for
being closed minded

I won't drag out this discussion any further by asking whether that might be
a valid use case or just a lurking bug in itself :)

Thanks for humoring me. The current proposal for KIP-478 sounds good to me

On Thu, Sep 10, 2020 at 3:43 PM John Roesler <vvcep...@apache.org> wrote:

> Ah, thanks Sophie,
>
> I'm sorry for misinterpreting your resonse. Yes, we
> absolutely can and should clear the context before
> punctuating.
>
> My secondary concern is maybe more far-fetched. I was
> thinking that inside process(key,value), a Processor might
> do a get/put of a _different_ key. Consider, for example,
> the way that Suppress processors work. When they get a
> record, they add it to the store and then do a range scan
> and possibly forward a _different_ record. Of course, this
> is an operation that is deeply coupled to the internals, and
> the Suppress processor accordingly actually does get access
> to the internal context so that it can set the context
> before forwarding.
>
> Still, it seems like I've had a handful of conversations
> with people over the years in which they tell me they are
> using state stores in a way that transcends the "get and put
> the currently processing record" access pattern. I doubt
> that those folks would even have considered the possiblity
> that the currently processing record's _context_ could
> pollute their state store operations, as I myself never gave
> it a second thought until the current conversation began. In
> cases like that, we have actually set a trap for these
> people, and it seems better to dismantle the trap.
>
> As you noted, really the only people who would be negatively
> impacted are people who implement their own state stores.
> These folks will get the deprecation warning and try to
> adapt their stores to the new interface. If they needed
> access to the record context, they would find it's now
> missing. They'd ask us about it, and we'd have the ability
> to explain the lurking bug that they have had in their
> stores all along, as well as the new recommended pattern
> (just pass everything you need in the value). If that's
> unsatisfying, _then_ we should consider amending the API.
>
> Thanks,
> -John
>
> On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman
> wrote:
> > > Regarding your first sentence, "...the processor would null
> > > out the record context...", this is not possible, since the
> > > processor doesn't have write access to the context. We could
> > > add it,
> > >
> >
> > Sorry, this was poorly phrased, I definitely did not mean to imply that
> we
> > should make the context modifiable by the Processors themselves. I meant
> > this should be handled by the internal processing framework that deals
> with
> > passing records from one Processor to the next, setting the record
> context
> > when a new record is picked up, invoking the punctuators, etc. I believe
> > this
> > all currently happens in the StreamTask? It already can and does
> overwrite
> > the record context as new records are processed, and is also responsible
> > for calling the punctuators, so it doesn't seem like a huge leap to just
> say
> > "null out the current record before punctuating"
> >
> > To clarify, I was never advocating or even considering to give the
> > Processors
> > write access to the record context. Sorry if my last message (or all of
> > them)
> > was misleading. I just wanted to point out that the punctuator concern is
> > orthogonal to the question of whether we should include the record
> context
> > in the StateStoreContext. It's definitely a real problem, but it's a
> > problem
> > that exists at the Processor level and not just the StateStore.
> >
> > So, I don't think there is any reason we *can't* retain the record
> context
> > in the
> > StateStoreContext, and if any users came along with a clear use case I'd
> > find
> > that convincing. In the absence of any examples, the conservative
> approach
> > sounds good to me.
> >
> > If it turns out that someone did need the record context in their custom
> > state
> > store, I'm sure they'll submit a politely worded bug report alerting us
> > that we
> > broke their application.
> >
> > On Thu, Sep 10, 2020 at 3:05 PM John Roesler <vvcep...@apache.org>
> wrote:
> >
> > > Thanks, Sophie,
> > >
> > > Yes, now that you point it out, I can see that the record
> > > context itself should be nulled out by Streams before
> > > invoking punctuators. From that perspective, we don't need
> > > to think about the second-order problem of what's in the
> > > context for the state store when called from a punctuator.
> > >
> > > Regarding your first sentence, "...the processor would null
> > > out the record context...", this is not possible, since the
> > > processor doesn't have write access to the context. We could
> > > add it, but then all kinds of strange effects would ensue
> > > when downstream processors execute but the context is empty,
> > > etc. Better to just let the framework manage the record
> > > context and keep it read-only for Processors.
> > >
> > > Reading between the lines of your last reply, it sounds that
> > > the disconnect may just have been a mutual misunderstanding
> > > about whether or not Processors currently have access to set
> > > the record context. Since they do not, if we wanted to add
> > > the record context to StateStoreContext in a well-defined
> > > way, we'd also have to add the ability for Processors to
> > > manipulate it. But then, we're just creating a side-channel
> > > for Processors to pass some information in arguments to
> > > "put()" and other information implicitly through the
> > > context. It seems better just to go for a single channel for
> > > now.
> > >
> > > It sounds like you're basically in favor of the conservative
> > > approach, and you just wanted to understand the blockers
> > > that I implied. Does my clarification make sense?
> > >
> > > Thanks,
> > > -John
> > >
> > > On Thu, 2020-09-10 at 10:54 -0700, Sophie Blee-Goldman
> > > wrote:
> > > > I was just thinking that the processor would null out the record
> context
> > > > after it
> > > > finished processing the record, so I'm not sure I follow why this
> would
> > > not
> > > > be
> > > > possible? AFAIK we never call a punctuator in the middle of
> processing a
> > > > record through the topology, and even if we did, we still know when
> it is
> > > > about
> > > > to be called and could set it to null beforehand.
> > > >
> > > > I'm not trying to advocate for it here, I'm in agreement that
> anything
> > > you
> > > > want
> > > > to access within the store can and should be accessed within the
> calling
> > > > Processor/Punctuator before reaching the store. The "we can always
> add it
> > > > later if necessary" argument is also pretty convincing. Just trying
> to
> > > > understand
> > > > why this wouldn't be possible.
> > > >
> > > > FWIW, the question of "what is the current record in the context of a
> > > > Punctuator"
> > > > exists independently of whether we want to add this to the
> > > StateStoreContext
> > > > or not. The full ProcessorContext, including the current record
> context,
> > > is
> > > > already available within a Punctuator, so removing the current record
> > > > context
> > > > from the StateStoreContext does not solve the problem. Users can --
> and
> > > have
> > > > (see KAFKA-9584 <https://issues.apache.org/jira/browse/KAFKA-9584>;;)
> --
> > > hit
> > > > such subtle bugs without ever invoking a StateStore
> > > > from their punctuator.
> > > >
> > > > Again, I think I do agree that we should leave the current record
> context
> > > > off of
> > > > the StateStoreContext, but I don't think the Punctuator argument
> against
> > > it
> > > > is
> > > > very convincing. It sounds to me like we need to disallow access to
> the
> > > > current
> > > > record context from within the Punctuator, independent of anything
> to do
> > > > with
> > > > state stores
> > > >
> > > > On Thu, Sep 10, 2020 at 7:12 AM John Roesler <vvcep...@apache.org>
> > > wrote:
> > > > > Thanks for the thoughts, Sophie.
> > > > >
> > > > > I agree that the extra information could be useful. My only
> concern is
> > > > > that it doesn’t seem like we can actually supply that extra
> information
> > > > > correctly. So, then we have a situation where the system offers
> useful
> > > API
> > > > > calls that are only correct in a narrow range of use cases.
> Outside of
> > > > > those use cases, you get incorrect behavior.
> > > > >
> > > > > If it were possible to null out the context before you put a
> document
> > > to
> > > > > which the context doesn’t apply, then the concern would be
> mitigated.
> > > But
> > > > > it would still be pretty weird from the perspective of the store
> that
> > > > > sometimes the context is populated and other times, it’s null.
> > > > >
> > > > > But that seems moot, since it doesn’t seem possible to null out the
> > > > > context. Only the Processor could know whether it’s about to put a
> > > document
> > > > > different from the context or not. And it would be inappropriate to
> > > offer a
> > > > > public ProcessorContext api to manage the record context.
> > > > >
> > > > > Ultimately, it still seems like if you want to store headers, you
> can
> > > > > store them explicitly, right? That doesn’t seem onerous to me, and
> it
> > > kind
> > > > > of seems better than relying on undefined or asymmetrical behavior
> in
> > > the
> > > > > store itself.
> > > > >
> > > > > Anyway, I’m not saying that we couldn’t solve these problems. Just
> > > that it
> > > > > seems a little that we can be conservative and avoid them for now.
> If
> > > it
> > > > > turns out we really need to solve them, we can always do it later.
> > > > >
> > > > > Thanks,
> > > > > John
> > > > >
> > > > > On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote:
> > > > > > > If you were to call "put" from a punctuator, or do a
> > > > > > > `range()` query and then update one of those records with
> > > > > > > `put()`, you'd have a very subtle bug on your hands.
> > > > > >
> > > > > > Can you elaborate on this a bit? I agree that the punctuator
> case is
> > > an
> > > > > > obvious exemption to the assumption that store invocations always
> > > > > > have a corresponding "current record", but I don't understand the
> > > > > > second example. Are you envisioning a scenario where the #process
> > > > > > method performs a range query and then updates records? Or were
> > > > > > you just giving another example of the punctuator case?
> > > > > >
> > > > > > I only bring it up because I agree that the current record
> > > information
> > > > > could
> > > > > > still be useful within the context of the store. As a non-user my
> > > input
> > > > > on
> > > > > > this
> > > > > > definitely has limited value, but it just isn't striking me as
> > > obvious
> > > > > that
> > > > > > we
> > > > > > should remove access to the current record context from the state
> > > stores.
> > > > > > If there is no current record, as in the  punctuator case, we
> should
> > > just
> > > > > > set
> > > > > > the record context to null (or Optional.empty, etc).
> > > > > >
> > > > > > That said, the put() always has to come from somewhere, and that
> > > > > > somewhere is always going to be either a Processor or a
> Punctuator,
> > > both
> > > > > > of which will still have access to the full context. So
> additional
> > > info
> > > > > > such as
> > > > > > the timestamp can and should probably be supplied to the store
> before
> > > > > > calling put(), rather than looked up by the store. But I can see
> some
> > > > > other
> > > > > > things being useful, for example the current record's headers.
> Maybe
> > > > > if/when
> > > > > > we add better (or any) support for headers in state stores this
> will
> > > be
> > > > > > less true.
> > > > > >
> > > > > > Of course as John has made clear, it's pretty hard to judge
> without
> > > > > > examples
> > > > > > and more insight as to what actually goes on within a custom
> state
> > > store
> > > > > > On Wed, Sep 9, 2020 at 8:07 PM John Roesler <vvcep...@apache.org
> >
> > > wrote:
> > > > > > > Hi Paul,
> > > > > > >
> > > > > > > It's good to hear from you!
> > > > > > >
> > > > > > > I'm glad you're in favor of the direction. Especially when
> > > > > > > it comes to public API and usability concens, I tend to
> > > > > > > think that "the folks who matter" are actually the folks who
> > > > > > > have to use the APIs to accomplish real tasks. It can be
> > > > > > > hard for me to be sure I'm thinking clearly from that
> > > > > > > perspective.
> > > > > > >
> > > > > > > Funny story, I also started down this road a couple of times
> > > > > > > already and backed them out before the KIP because I was
> > > > > > > afraid of the scope of the proposal. Unfortunately, needing
> > > > > > > to make a new ProcessorContext kind of forced my hand.
> > > > > > >
> > > > > > > I see you've called me out about the ChangeLogging stores :)
> > > > > > > In fact, I think these are the main/only reason that stores
> > > > > > > might really need to invoke "forward()". My secret plan was
> > > > > > > to cheat and either accomplish change-logging by a different
> > > > > > > mechanism than implementing the store interface, or by just
> > > > > > > breaking encapsulation to sneak the "real" ProcessorContext
> > > > > > > into the ChangeLogging stores. But those are all
> > > > > > > implementation details. I think the key question is whether
> > > > > > > anyone else has a store implementation that needs to call
> > > > > > > "forward()". It's not what you mentioned, but since you
> > > > > > > spoke up, I'll just ask: if you have a use case for calling
> > > > > > > "forward()" in a store, please share it.
> > > > > > >
> > > > > > > Regarding the other record-specific context methods, I think
> > > > > > > you have a good point, but I also can't quite wrap my head
> > > > > > > around how we can actually guarantee it to work in general.
> > > > > > > For example, the case you cited, where the implementation of
> > > > > > > `KeyValueStore#put(key, value)` uses the context to augment
> > > > > > > the record with timestamp information. This relies on the
> > > > > > > assumption that you would only call "put()" from inside a
> > > > > > > `Processor#process(key, value)` call in which the record
> > > > > > > being processed is the same record that you're trying to put
> > > > > > > into the store.
> > > > > > >
> > > > > > > If you were to call "put" from a punctuator, or do a
> > > > > > > `range()` query and then update one of those records with
> > > > > > > `put()`, you'd have a very subtle bug on your hands. Right
> > > > > > > now, the Streams component that actually calls the Processor
> > > > > > > takes care to set the right record context before invoking
> > > > > > > the method, and in the case of caching, etc., it also takes
> > > > > > > care to swap out the old context and keep it somewhere safe.
> > > > > > > But when it comes to public API Processors calling methods
> > > > > > > on StateStores, there's no opportunity for any component to
> > > > > > > make sure the context is always correct.
> > > > > > >
> > > > > > > In the face of that situation, it seemed better to just move
> > > > > > > in the direction of a "normal" data store. I.e., when you
> > > > > > > use a HashMap or RocksDB or other "state stores", you don't
> > > > > > > expect them to automatically know extra stuff about the
> > > > > > > record you're storing. If you need them to know something,
> > > > > > > you just put it in the value.
> > > > > > >
> > > > > > > All of that said, I'm just reasoning from first principles
> > > > > > > here. To really know if this is a mistake or not, I need to
> > > > > > > be in your place. So please push back if you think what I
> > > > > > > said is nonsense. My personal plan was to keep an eye out
> > > > > > > during the period where the old API was still present, but
> > > > > > > deprecated, to see if people were struggling to use the new
> > > > > > > API. If so, then we'd have a chance to address it before
> > > > > > > dropping the old API. But it's even better if you can help
> > > > > > > think it through now.
> > > > > > >
> > > > > > > It did also cross my mind to _not_ add the
> > > > > > > StateStoreContext, but just to continue to punt on the
> > > > > > > question by just dropping in the new ProcessorContext to the
> > > > > > > new init method. If StateStoreContext seems too bold, we can
> > > > > > > go that direction. But if we actually add some methods to
> > > > > > > StateStoreContext, I'd like to be able to ensure they would
> > > > > > > be well defined. I think the current situation was more of
> > > > > > > an oversight than a choice.
> > > > > > >
> > > > > > > Thanks again for your reply,
> > > > > > > -John
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul Whalen wrote:
> > > > > > > > John,
> > > > > > > >
> > > > > > > > It's exciting to see this KIP head in this direction!  In the
> > > last
> > > > > year
> > > > > > > or
> > > > > > > > so I've tried to sketch out some usability improvements for
> > > custom
> > > > > state
> > > > > > > > stores, and I also ended up splitting out the
> StateStoreContext
> > > from
> > > > > the
> > > > > > > > ProcessorContext in an attempt to facilitate what I was
> doing.  I
> > > > > sort of
> > > > > > > > abandoned it when I realized how large the ideal change might
> > > have
> > > > > to be,
> > > > > > > > but it's great to see that there is other interest in moving
> in
> > > this
> > > > > > > > direction (from the folks that matter :) ).
> > > > > > > >
> > > > > > > > Having taken a stab at it myself, I have a comment/question
> on
> > > this
> > > > > > > bullet
> > > > > > > > about StateStoreContext:
> > > > > > > >
> > > > > > > > It does *not*  include anything processor- or record-
> specific,
> > > like
> > > > > > > > > `forward()` or any information about the "current" record,
> > > which is
> > > > > > > only a
> > > > > > > > > well-defined in the context of the Processor. Processors
> > > process
> > > > > one
> > > > > > > record
> > > > > > > > > at a time, but state stores may be used to store and fetch
> many
> > > > > > > records, so
> > > > > > > > > there is no "current record".
> > > > > > > > >
> > > > > > > >
> > > > > > > > I totally agree that record-specific or processor-specific
> > > context
> > > > > in a
> > > > > > > > state store is often not well-defined and it would be good to
> > > > > separate
> > > > > > > that
> > > > > > > > out, but sometimes it (at least record-specific context) is
> > > actually
> > > > > > > > useful, for example, passing the record's timestamp through
> to
> > > the
> > > > > > > > underlying storage (or changelog topic):
> > > > > > > >
> > >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> > > > > > > > You could have the writer client of the state store pass this
> > > > > through,
> > > > > > > but
> > > > > > > > it would be nice to be able to write state stores where the
> > > client
> > > > > did
> > > > > > > not
> > > > > > > > have this responsibility.  I'm not sure if the solution is
> to add
> > > > > some
> > > > > > > > things back to StateStoreContext, or make yet another context
> > > that
> > > > > > > > represents record-specific context while inside a state
> store.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Paul
> > > > > > > >
> > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John Roesler <
> j...@vvcephei.org>
> > > > > wrote:
> > > > > > > > > Hello all,
> > > > > > > > >
> > > > > > > > > I've been slowly pushing KIP-478 forward over the last
> year,
> > > > > > > > > and I'm happy to say that we're making good progress now.
> > > > > > > > > However, several issues with the original design have come
> > > > > > > > > to light.
> > > > > > > > >
> > > > > > > > > The major changes:
> > > > > > > > >
> > > > > > > > > We discovered that the original plan of just adding generic
> > > > > > > > > parameters to ProcessorContext was too disruptive, so we
> are
> > > > > > > > > now adding a new api.ProcessorContext.
> > > > > > > > >
> > > > > > > > > That choice forces us to add a new StateStore.init method
> > > > > > > > > for the new context, but ProcessorContext really isn't
> ideal
> > > > > > > > > for state stores to begin with, so I'm proposing a new
> > > > > > > > > StateStoreContext for this purpose. In a nutshell, there
> are
> > > > > > > > > quite a few methods in ProcessorContext that actually
> should
> > > > > > > > > never be called from inside a StateStore.
> > > > > > > > >
> > > > > > > > > Also, since there is a new ProcessorContext interface, we
> > > > > > > > > need a new MockProcessorContext implementation in the test-
> > > > > > > > > utils module.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > The changeset for the KIP document is here:
> > > > > > > > >
> > > > > > > > >
> > >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > > > > > > > > And the KIP itself is here:
> > > > > > > > >
> > > > > > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > > > > > > > If you have any concerns, please let me know!
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > -John
> > > > > > > > >
> > > > > > > > >
>
>

Reply via email to