>
> Regarding your first sentence, "...the processor would null
> out the record context...", this is not possible, since the
> processor doesn't have write access to the context. We could
> add it,
>

Sorry, this was poorly phrased, I definitely did not mean to imply that we
should make the context modifiable by the Processors themselves. I meant
this should be handled by the internal processing framework that deals with
passing records from one Processor to the next, setting the record context
when a new record is picked up, invoking the punctuators, etc. I believe
this
all currently happens in the StreamTask? It already can and does overwrite
the record context as new records are processed, and is also responsible
for calling the punctuators, so it doesn't seem like a huge leap to just say
"null out the current record before punctuating"

To clarify, I was never advocating or even considering to give the
Processors
write access to the record context. Sorry if my last message (or all of
them)
was misleading. I just wanted to point out that the punctuator concern is
orthogonal to the question of whether we should include the record context
in the StateStoreContext. It's definitely a real problem, but it's a
problem
that exists at the Processor level and not just the StateStore.

So, I don't think there is any reason we *can't* retain the record context
in the
StateStoreContext, and if any users came along with a clear use case I'd
find
that convincing. In the absence of any examples, the conservative approach
sounds good to me.

If it turns out that someone did need the record context in their custom
state
store, I'm sure they'll submit a politely worded bug report alerting us
that we
broke their application.

On Thu, Sep 10, 2020 at 3:05 PM John Roesler <vvcep...@apache.org> wrote:

> Thanks, Sophie,
>
> Yes, now that you point it out, I can see that the record
> context itself should be nulled out by Streams before
> invoking punctuators. From that perspective, we don't need
> to think about the second-order problem of what's in the
> context for the state store when called from a punctuator.
>
> Regarding your first sentence, "...the processor would null
> out the record context...", this is not possible, since the
> processor doesn't have write access to the context. We could
> add it, but then all kinds of strange effects would ensue
> when downstream processors execute but the context is empty,
> etc. Better to just let the framework manage the record
> context and keep it read-only for Processors.
>
> Reading between the lines of your last reply, it sounds that
> the disconnect may just have been a mutual misunderstanding
> about whether or not Processors currently have access to set
> the record context. Since they do not, if we wanted to add
> the record context to StateStoreContext in a well-defined
> way, we'd also have to add the ability for Processors to
> manipulate it. But then, we're just creating a side-channel
> for Processors to pass some information in arguments to
> "put()" and other information implicitly through the
> context. It seems better just to go for a single channel for
> now.
>
> It sounds like you're basically in favor of the conservative
> approach, and you just wanted to understand the blockers
> that I implied. Does my clarification make sense?
>
> Thanks,
> -John
>
> On Thu, 2020-09-10 at 10:54 -0700, Sophie Blee-Goldman
> wrote:
> > I was just thinking that the processor would null out the record context
> > after it
> > finished processing the record, so I'm not sure I follow why this would
> not
> > be
> > possible? AFAIK we never call a punctuator in the middle of processing a
> > record through the topology, and even if we did, we still know when it is
> > about
> > to be called and could set it to null beforehand.
> >
> > I'm not trying to advocate for it here, I'm in agreement that anything
> you
> > want
> > to access within the store can and should be accessed within the calling
> > Processor/Punctuator before reaching the store. The "we can always add it
> > later if necessary" argument is also pretty convincing. Just trying to
> > understand
> > why this wouldn't be possible.
> >
> > FWIW, the question of "what is the current record in the context of a
> > Punctuator"
> > exists independently of whether we want to add this to the
> StateStoreContext
> > or not. The full ProcessorContext, including the current record context,
> is
> > already available within a Punctuator, so removing the current record
> > context
> > from the StateStoreContext does not solve the problem. Users can -- and
> have
> > (see KAFKA-9584 <https://issues.apache.org/jira/browse/KAFKA-9584>;) --
> hit
> > such subtle bugs without ever invoking a StateStore
> > from their punctuator.
> >
> > Again, I think I do agree that we should leave the current record context
> > off of
> > the StateStoreContext, but I don't think the Punctuator argument against
> it
> > is
> > very convincing. It sounds to me like we need to disallow access to the
> > current
> > record context from within the Punctuator, independent of anything to do
> > with
> > state stores
> >
> > On Thu, Sep 10, 2020 at 7:12 AM John Roesler <vvcep...@apache.org>
> wrote:
> >
> > > Thanks for the thoughts, Sophie.
> > >
> > > I agree that the extra information could be useful. My only concern is
> > > that it doesn’t seem like we can actually supply that extra information
> > > correctly. So, then we have a situation where the system offers useful
> API
> > > calls that are only correct in a narrow range of use cases. Outside of
> > > those use cases, you get incorrect behavior.
> > >
> > > If it were possible to null out the context before you put a document
> to
> > > which the context doesn’t apply, then the concern would be mitigated.
> But
> > > it would still be pretty weird from the perspective of the store that
> > > sometimes the context is populated and other times, it’s null.
> > >
> > > But that seems moot, since it doesn’t seem possible to null out the
> > > context. Only the Processor could know whether it’s about to put a
> document
> > > different from the context or not. And it would be inappropriate to
> offer a
> > > public ProcessorContext api to manage the record context.
> > >
> > > Ultimately, it still seems like if you want to store headers, you can
> > > store them explicitly, right? That doesn’t seem onerous to me, and it
> kind
> > > of seems better than relying on undefined or asymmetrical behavior in
> the
> > > store itself.
> > >
> > > Anyway, I’m not saying that we couldn’t solve these problems. Just
> that it
> > > seems a little that we can be conservative and avoid them for now. If
> it
> > > turns out we really need to solve them, we can always do it later.
> > >
> > > Thanks,
> > > John
> > >
> > > On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote:
> > > > > If you were to call "put" from a punctuator, or do a
> > > > > `range()` query and then update one of those records with
> > > > > `put()`, you'd have a very subtle bug on your hands.
> > > >
> > > > Can you elaborate on this a bit? I agree that the punctuator case is
> an
> > > > obvious exemption to the assumption that store invocations always
> > > > have a corresponding "current record", but I don't understand the
> > > > second example. Are you envisioning a scenario where the #process
> > > > method performs a range query and then updates records? Or were
> > > > you just giving another example of the punctuator case?
> > > >
> > > > I only bring it up because I agree that the current record
> information
> > > could
> > > > still be useful within the context of the store. As a non-user my
> input
> > > on
> > > > this
> > > > definitely has limited value, but it just isn't striking me as
> obvious
> > > that
> > > > we
> > > > should remove access to the current record context from the state
> stores.
> > > > If there is no current record, as in the  punctuator case, we should
> just
> > > > set
> > > > the record context to null (or Optional.empty, etc).
> > > >
> > > > That said, the put() always has to come from somewhere, and that
> > > > somewhere is always going to be either a Processor or a Punctuator,
> both
> > > > of which will still have access to the full context. So additional
> info
> > > > such as
> > > > the timestamp can and should probably be supplied to the store before
> > > > calling put(), rather than looked up by the store. But I can see some
> > > other
> > > > things being useful, for example the current record's headers. Maybe
> > > if/when
> > > > we add better (or any) support for headers in state stores this will
> be
> > > > less true.
> > > >
> > > > Of course as John has made clear, it's pretty hard to judge without
> > > > examples
> > > > and more insight as to what actually goes on within a custom state
> store
> > > >
> > > > On Wed, Sep 9, 2020 at 8:07 PM John Roesler <vvcep...@apache.org>
> wrote:
> > > >
> > > > > Hi Paul,
> > > > >
> > > > > It's good to hear from you!
> > > > >
> > > > > I'm glad you're in favor of the direction. Especially when
> > > > > it comes to public API and usability concens, I tend to
> > > > > think that "the folks who matter" are actually the folks who
> > > > > have to use the APIs to accomplish real tasks. It can be
> > > > > hard for me to be sure I'm thinking clearly from that
> > > > > perspective.
> > > > >
> > > > > Funny story, I also started down this road a couple of times
> > > > > already and backed them out before the KIP because I was
> > > > > afraid of the scope of the proposal. Unfortunately, needing
> > > > > to make a new ProcessorContext kind of forced my hand.
> > > > >
> > > > > I see you've called me out about the ChangeLogging stores :)
> > > > > In fact, I think these are the main/only reason that stores
> > > > > might really need to invoke "forward()". My secret plan was
> > > > > to cheat and either accomplish change-logging by a different
> > > > > mechanism than implementing the store interface, or by just
> > > > > breaking encapsulation to sneak the "real" ProcessorContext
> > > > > into the ChangeLogging stores. But those are all
> > > > > implementation details. I think the key question is whether
> > > > > anyone else has a store implementation that needs to call
> > > > > "forward()". It's not what you mentioned, but since you
> > > > > spoke up, I'll just ask: if you have a use case for calling
> > > > > "forward()" in a store, please share it.
> > > > >
> > > > > Regarding the other record-specific context methods, I think
> > > > > you have a good point, but I also can't quite wrap my head
> > > > > around how we can actually guarantee it to work in general.
> > > > > For example, the case you cited, where the implementation of
> > > > > `KeyValueStore#put(key, value)` uses the context to augment
> > > > > the record with timestamp information. This relies on the
> > > > > assumption that you would only call "put()" from inside a
> > > > > `Processor#process(key, value)` call in which the record
> > > > > being processed is the same record that you're trying to put
> > > > > into the store.
> > > > >
> > > > > If you were to call "put" from a punctuator, or do a
> > > > > `range()` query and then update one of those records with
> > > > > `put()`, you'd have a very subtle bug on your hands. Right
> > > > > now, the Streams component that actually calls the Processor
> > > > > takes care to set the right record context before invoking
> > > > > the method, and in the case of caching, etc., it also takes
> > > > > care to swap out the old context and keep it somewhere safe.
> > > > > But when it comes to public API Processors calling methods
> > > > > on StateStores, there's no opportunity for any component to
> > > > > make sure the context is always correct.
> > > > >
> > > > > In the face of that situation, it seemed better to just move
> > > > > in the direction of a "normal" data store. I.e., when you
> > > > > use a HashMap or RocksDB or other "state stores", you don't
> > > > > expect them to automatically know extra stuff about the
> > > > > record you're storing. If you need them to know something,
> > > > > you just put it in the value.
> > > > >
> > > > > All of that said, I'm just reasoning from first principles
> > > > > here. To really know if this is a mistake or not, I need to
> > > > > be in your place. So please push back if you think what I
> > > > > said is nonsense. My personal plan was to keep an eye out
> > > > > during the period where the old API was still present, but
> > > > > deprecated, to see if people were struggling to use the new
> > > > > API. If so, then we'd have a chance to address it before
> > > > > dropping the old API. But it's even better if you can help
> > > > > think it through now.
> > > > >
> > > > > It did also cross my mind to _not_ add the
> > > > > StateStoreContext, but just to continue to punt on the
> > > > > question by just dropping in the new ProcessorContext to the
> > > > > new init method. If StateStoreContext seems too bold, we can
> > > > > go that direction. But if we actually add some methods to
> > > > > StateStoreContext, I'd like to be able to ensure they would
> > > > > be well defined. I think the current situation was more of
> > > > > an oversight than a choice.
> > > > >
> > > > > Thanks again for your reply,
> > > > > -John
> > > > >
> > > > >
> > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul Whalen wrote:
> > > > > > John,
> > > > > >
> > > > > > It's exciting to see this KIP head in this direction!  In the
> last
> > > year
> > > > > or
> > > > > > so I've tried to sketch out some usability improvements for
> custom
> > > state
> > > > > > stores, and I also ended up splitting out the StateStoreContext
> from
> > > the
> > > > > > ProcessorContext in an attempt to facilitate what I was doing.  I
> > > sort of
> > > > > > abandoned it when I realized how large the ideal change might
> have
> > > to be,
> > > > > > but it's great to see that there is other interest in moving in
> this
> > > > > > direction (from the folks that matter :) ).
> > > > > >
> > > > > > Having taken a stab at it myself, I have a comment/question on
> this
> > > > > bullet
> > > > > > about StateStoreContext:
> > > > > >
> > > > > > It does *not*  include anything processor- or record- specific,
> like
> > > > > > > `forward()` or any information about the "current" record,
> which is
> > > > > only a
> > > > > > > well-defined in the context of the Processor. Processors
> process
> > > one
> > > > > record
> > > > > > > at a time, but state stores may be used to store and fetch many
> > > > > records, so
> > > > > > > there is no "current record".
> > > > > > >
> > > > > >
> > > > > > I totally agree that record-specific or processor-specific
> context
> > > in a
> > > > > > state store is often not well-defined and it would be good to
> > > separate
> > > > > that
> > > > > > out, but sometimes it (at least record-specific context) is
> actually
> > > > > > useful, for example, passing the record's timestamp through to
> the
> > > > > > underlying storage (or changelog topic):
> > > > > >
> > >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> > > > > > You could have the writer client of the state store pass this
> > > through,
> > > > > but
> > > > > > it would be nice to be able to write state stores where the
> client
> > > did
> > > > > not
> > > > > > have this responsibility.  I'm not sure if the solution is to add
> > > some
> > > > > > things back to StateStoreContext, or make yet another context
> that
> > > > > > represents record-specific context while inside a state store.
> > > > > >
> > > > > > Best,
> > > > > > Paul
> > > > > >
> > > > > > On Wed, Sep 9, 2020 at 5:43 PM John Roesler <j...@vvcephei.org>
> > > wrote:
> > > > > > > Hello all,
> > > > > > >
> > > > > > > I've been slowly pushing KIP-478 forward over the last year,
> > > > > > > and I'm happy to say that we're making good progress now.
> > > > > > > However, several issues with the original design have come
> > > > > > > to light.
> > > > > > >
> > > > > > > The major changes:
> > > > > > >
> > > > > > > We discovered that the original plan of just adding generic
> > > > > > > parameters to ProcessorContext was too disruptive, so we are
> > > > > > > now adding a new api.ProcessorContext.
> > > > > > >
> > > > > > > That choice forces us to add a new StateStore.init method
> > > > > > > for the new context, but ProcessorContext really isn't ideal
> > > > > > > for state stores to begin with, so I'm proposing a new
> > > > > > > StateStoreContext for this purpose. In a nutshell, there are
> > > > > > > quite a few methods in ProcessorContext that actually should
> > > > > > > never be called from inside a StateStore.
> > > > > > >
> > > > > > > Also, since there is a new ProcessorContext interface, we
> > > > > > > need a new MockProcessorContext implementation in the test-
> > > > > > > utils module.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > The changeset for the KIP document is here:
> > > > > > >
> > > > > > >
> > >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > > > > > > And the KIP itself is here:
> > > > > > >
> > > > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > > > > >
> > > > > > > If you have any concerns, please let me know!
> > > > > > >
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > >
> > > > > > >
>
>

Reply via email to