Thanks.

It would be good to add the concrete interfaces of the new classed to the KIP, ie,

 - FixedKeyProcessorSupplier
 - FixedKeyProcessor
 - FixedKeyProcessorContext
 - FixedKeyRecord


-Matthias


On 3/10/22 3:15 PM, Jorge Esteban Quilcate Otoya wrote:
Thanks all!

I agree with Matthias and Jon on going forward with the new
`FixedKeyRecord` approach.
The KIP has been updated accordingly.

Feel free to add your vote or amend on the vote thread if needed.

Cheers,

On Mon, 7 Mar 2022 at 21:57, Matthias J. Sax <mj...@apache.org> wrote:

I think it's ok that we cannot prevent users from mutating a given
read-only object. We have similar issues "all over the place" in the
API, because it's just how Java works unfortunately (eg,
`ValueMapperWithKey` and similar interfaces).

The point being is, that the API clearly expresses that the key should
not be changes, as `FixedKeyRecord` as not `withKey()` method, what is
much better then having `Record.withKey()` and thus incorrectly
indicating to user that it would be ok to set a new key.

I think it's worth to add the new interfaces.


-Matthias


On 3/7/22 11:46 AM, Guozhang Wang wrote:
Thanks John! I feel a bit ashamed of just thinking loud here without
trying
out prototypes myself :P

I think the FixedKeyProcessor/Record looks very good -- like you said,
since we are making a new set of APIs then why don't we reconsider more
bolderly -- but at the same time I'd also like to make sure we agree on
how
much "safety" we can achieve in runtime: even with the proposed APIs, we
cannot prevent users doing something like:

---------------
process(FixedKeyRecord inputRecord) {
      inputRecord.key().modifyField(...); // this is not preventable with
runtime key validations either since we just check the key object itself
is
not replaced
      context.forward(inputRecord);
}

---------------

I.e. in either type-safety or runtime validation, we cannot be 100% safe
that users would not do anything wrong. This drives me to think, how much
we'd like to pay to "remind" (instead of say "enforce", since we cannot
really do it) users the semantics of "processValue". Personally I felt
that
adding the new set of APIs for that purpose only is a bit overkill, and
hence was leaning towards just the runtime validation. But I admit this
is
purely subjective so I'm willing to yield to the group if others feel
it's
worthy to do so.


Guozhang



On Mon, Mar 7, 2022 at 10:32 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

Thanks, John!
This looks very promising.

I will familiarize this approach and update the KIP accordingly. From
what
I can see so far, this should cover most of the open issues in this
proposal.

PS.

Just as a reminder, the current approach with transformers
is NOT enforced at compile time. Transformers have access to
a "forwarding disabled" processor context, which still has
the forward methods that throw a runtime exception when
invoked.

Agree. I was referring to the value transformers where `readOnlyKey` is
passed but not forwarded internally. Though about the "forwarding
disabled"
approach, you're totally right that is a runtime validation.
Regardless, the approach proposed here will be a much better one.


On Sun, 6 Mar 2022 at 18:59, John Roesler <vvcep...@apache.org> wrote:

Hello all,

It seems like we're making good progress on this discussion.
If I'm keeping track correctly, if we can resolve this
question about how to handle processValues(), then we should
be able to finalize the vote, right?

I share Matthias's preference for having a type-safe API.

Just as a reminder, the current approach with transformers
is NOT enforced at compile time. Transformers have access to
a "forwarding disabled" processor context, which still has
the forward methods that throw a runtime exception when
invoked.

However, the spirit of the "new processor api" line of work
is to clean up a lot of the cruft around the original
processor API, so this is a good opportunity to introduce a
type-safe version if we can.

Based on my experience adding the new processor API, I felt
like it should be possible to do what he suggests, but it
would be more involved than what he said. The biggest thing
I learned from that effort, though, is that you really have
to just try it to see what all the complications are.

With that in mind, I went ahead and implemented the
suggestion: https://github.com/apache/kafka/pull/11854

This is a functional prototype. It only adds processValues,
which takes a supplier of a new type, FixedKeyProcessor.
That processor only processes FixedKeyRecords, which have a
key that cannot be changed. FixedKeyProcessors have a
special context, a FixedKeyProcessorContext, which can only
forward FixedKeyRecords.

FixedKeyRecords have "fixed keys" because its key can only
be set in the constructor, and its constructor is package-
private.

As you can see, this new record/processor/context ecosystem
is an independent peer of the general one. This is necessary
to ensure the desired compiler check. For example, if
FixedKeyRecord were merely an interface implemented by
Record, then users could create a new Record with a new key
and forward it as a FixedKeyRecord, violating the
constraint.

As I said, with this proposal, the devil is in the details,
so if anyone thinks the API can be simplified, I suggest you
check out the branch and try out your proposal. I'd be very
happy to have a simplier solution, but I'm also pretty sure
this complexity is necessary.

Taking a step back, I do think this approach results in a
better API, even though the change is a little complicated.

Thanks,
-John

On Sun, 2022-03-06 at 10:51 +0000, Jorge Esteban Quilcate
Otoya wrote:
Matthias, thanks for your feedback.

I can see the following alternatives to deal with `processValues()`:

1. Runtime key validation (current proposal)
2. Using Void type. Guozhang already points out some important
considerations about allocating `Record` twice.
3. Adding a new ValueRecord, proposed by Matthias. This one would
carry
some of the problems of the second alternative as ValueRecord will
have
to
be created from a Record. Also, either by having a public constructor
or
creation from a Record, the key _can_ be changed without being
captured
by
the Topology.
4. Reducing the KIP scope to `process` only, and removing/postponing
`processValues` for a later DSL redesign.

A couple of additional comments:

About the Record API:

IIUC, the issue with allocating new objects is coming from the current
design of the Record API.
If a user does record.withKey(...).withValue(...) is already leading
to a
couple of instatiations.
My impression is that if the cost/value of immutability has been
weighed
already, then maybe the considerations for alternative 2 can be
disregarded?
Either way, if the cost of recreation of objects is something we want
to
minimize, then maybe adding a Builder to the record should help to
reduce
the allocations.

About the key validation:

So far, the only way I can see to _really_ validate a key doesn't
change
at
compile-time is by not exposing it at all — as we are doing it today
with
Transform.
Otherwise, deal with it at runtime — as we have been dealing with
Transform
without the ability to forward.
Processor API already —by definition— means lower-level abstraction,
therefore users should be aware of the potential runtime exceptions if
the
key changes.
This is why I'm leaning towards alternative 1.

Looking forward to your feedback.
As a reminder, the vote thread is still open. Feel free to add your
vote
or
amend if needed.

Cheers,


On Fri, 4 Mar 2022 at 06:51, Matthias J. Sax <mj...@apache.org>
wrote:

John, thanks for verifying source compatibility. My impression was
that
it should be source compatible, I was just not 100% sure.

The question about `processValues()` is really a hard one. Guozhang's
point is very good one. Maybe we need to be pragmatic and accept the
runtime check (even if I deeply hate this solution compare to a
compile
time check).

Other possibilities to address this issue might just become too ugly?
It
seems it would require to add a new `ValueProcessorContext` that
offers
a `#forward(ValueRecord)` method (with `ValueRecord` being a `Record`
with immutable key? Not sure if we would be willing to go down this
route? Personally, I would be ok with it, as a strongly prefer
compile
time checks and I am happy to extend the API surface area to achieve
it
-- however, I won't be surprised if others don't like this idea...



-Matthias

On 2/27/22 6:20 AM, Jorge Esteban Quilcate Otoya wrote:
Thanks, Guozhang.

Compared with reference checks and runtime exceptions for those
who
mistakenly change the key, I think that enforcing everyone to
`setValue`
may incur more costs..

This is a fair point. I agree that this may incur in more costs
than
key
checking.

Will hold for more feedback, but if we agree I will update the KIP
during
the week.

Cheers,
Jorge.


On Sun, 27 Feb 2022 at 00:50, Guozhang Wang <wangg...@gmail.com>
wrote:

Hello folks,

Regarding the outstanding question, I'm actually a bit leaning
towards
the
second option since that `withKey()` itself always creates a new
Record
object. This has a few implications:

* That we would have to discard the previous Record object to be
GC'ed
with
the new object --- note in practice, processing value does not
mean
you'd
have to replace the whole value with `withValue`, but maybe you
just
need
to manipulate some fields of the value object if it is a JSon /
etc.
* It may become an obstacle for further runtime optimizations
e.g.
skip
serdes and interpret processing as direct byte manipulations.

Compared with reference checks and runtime exceptions for those
who
mistakenly change the key, I think that enforcing everyone to
`setValue`
may incur more costs..

Guozhang

On Fri, Feb 25, 2022 at 12:54 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

Hi all,

Appreciate very much all the great feedback received so far.

After applying that interface change, I don't see any syntax
errors in our tests (which use those methods), and the
StreamBuilderTest still passes for me.

This is awesome John, thank you for your efforts here.

Jorge, do you mind clarifying these points in the
Compatibility
section
of your KIP?

+1. I have clarified the impact of changing the return type in
the KIP.

I think the other outstanding question for you is whether
the output key type for processValues should be K or Void.

One thing I realized belatedly was that if we do set it to
Void, then users will actually have to override the key when
forwarding, like `record.withKey(null)`, whereas if we keep
it is K, all users have to do is not touch the key at all.

This is a tricky one.
On one hand, with Void type for key output, we force the users
to cast
to
Void and change the key to null,
though this can be documented on the API, so the users are
aware
of the
peculiarity of forwarding within `processValues`.
On the other hand, keeping the key type as output doesn't
_require_ to
do
any change of keys,
but this could lead to key-checking runtime exceptions.

I slightly inclined myself for the first option and change the
type to
`Void`.
This will impose a bit of pain on the users to gain some
type-safety
and
avoid runtime exceptions.
We can justify this requirement as a way to prove that the key
hasn't
changed.

Btw, thanks for this idea Matthias!


On Fri, 25 Feb 2022 at 17:10, John Roesler <
vvcep...@apache.org>
wrote:

Oh, one more thing Jorge,

I think the other outstanding question for you is whether
the output key type for processValues should be K or Void. I
get the impression that all of us don't feel too strongly
about it, so I think the ball is in your court to consider
everyone's points and make a call (with justification).

One thing I realized belatedly was that if we do set it to
Void, then users will actually have to override the key when
forwarding, like `record.withKey(null)`, whereas if we keep
it as K, all users have to do is not touch the key at all.

Thanks,
-John

On Fri, 2022-02-25 at 11:07 -0600, John Roesler wrote:
Hello all,

I'll chime in again in the interest of trying to do a
better
job of keeping KIPs moving forward...

Matthias raised some very good questions about whether the
change is really source compatible. I just checked out the
code and make the interface change that Jorge specified in
the KIP:

Modified methods:

KStream<KOut,VOut> KStream#process(ProcessorSupplier<K,
V,
KOut, VOut> processorSupplier, String... stateStoreNames)
KStream<KOut,VOut> KStream#process(ProcessorSupplier<K,
V,
KOut, VOut> processorSupplier, Named named, String...
stateStoreNames)

After applying that interface change, I don't see any
syntax
errors in our tests (which use those methods), and the
StreamBuilderTest still passes for me.

The reason is that the existing API already takes a
ProcessorSupplier<K, V, Void, Void> and is currently a
`void` return.

After this interface change, all existing usages will just
bind Void to KOut and Void to VOut. In other words, KOut,
which is short for `KOut extends Object` is an upper bound
on Void, so all existing processor suppliers are still
valid
arguments.

Because the current methods are void returns, no existing
code could be assigning the result to any variable, so
moving from a void return to a typed return is always
compatible.

Jorge, do you mind clarifying these points in the
Compatibility section of your KIP?

Thanks,
-John


On Wed, 2022-02-23 at 15:07 -0800, Matthias J. Sax wrote:
For this KIP, I also see the value. I was just trying to
make a
step
back and ask if it's a good short term solution. If we
believe it
is,
I
am fine with it.

(I am more worried about the header's KIP...)

Btw: I am still wondering if we can change existing
`process()` as
proposed in the KIP? It the propose change source
compatible? (It's
for
sure not binary compatible, but this seems fine -- I
don't
think we
guarantee binary compatibility).

Btw: would be good to clarify what is changes for
process() --
should
be
return type change from `void` to `KStream<KOut, VOut>`
as
well as
change of `ProcessorSupplier` generic types (output types
change
from
`Void` to `KOut` and `VOut`?



-Matthias

On 2/23/22 11:32 AM, Guozhang Wang wrote:
Hi folks,

I agree with John that this KIP by itself could be a
good
improvement, and
I feel it aligns well with the eventual DSL 2.0
proposal
so we do
not need
to hold it until later.

Regarding the last point (i.e. whether we should do
enforcement
with
a new
interface), here's my 2c: in the past we introduced
public
`ValueTransfomer/etc` for two purposes, 1) to enforce
the key is
not
modifiable, 2) to indicate inside the library's
topology
builder
itself
that since the key is not modified, the direct
downstream does
not
need to
inject a repartition stage. I think we are more or less
on the
same
page
that for purpose 1), doing runtime check could be
sufficient; as
for
the
purpose of 2), as for this KIP itself I think it is
similar to
what
we have
(i.e. just base on the function name "processValue"
itself) and
hence are
not sacrificed either. I do not know if
`KStream#processValue(ProcessorSupplier<K, V, Void,
VOut>
processorSupplier)` will work, or work better, maybe
Jorge could
do
some
digging and get back to us.


On Fri, Feb 18, 2022 at 8:24 AM John Roesler <
vvcep...@apache.org>
wrote:

Hello all,

While I sympathize with Matthias’s desire to wipe the
slate
clean
and
redesign the dsl with full knowledge of everything
we’ve
learned
in the
past few years, that would also be a pretty intense
project on
its
own. It
seems better to leave that project for someone who is
motivated
to
take it
on.

Reading between the lines, it seems like Jorge’s
motivation is
more along
the lines of removing a few specific pain points. I
appreciate
Matthias
extending the offer, but if Jorge doesn’t want to
redesign the
dsl
right
now, we’re better off just accepting the work he’s
willing to
do.

Specifically, this KIP is quite a nice improvement.
Looking at
the
KStream
interface, roughly half of it is devoted to various
flavors of
“transform”,
which makes it really hard on users to figure out
which they
are
supposed
to use for what purpose. This kip let us drop all
that
complexity
in favor
of just two methods, thanks to the fact that we now
have the
ability for
processors to specify their forwarding type.

By the way, I really like Matthias’s suggestion to
set
the KOut
generic
bound to Void for processValues. Then, instead of
doing an
equality check
on the key during forward, you’d just set the key
back
to the
one
saved
before processing (with setRecordKey). This is both
more
efficient
(because
we don’t have the equality check) and more foolproof
for users
(because
it’s enforced by the compiler instead of the
runtime).

Thanks, all!
-John

On Fri, Feb 18, 2022, at 00:43, Jorge Esteban
Quilcate
Otoya
wrote:
On Fri, 18 Feb 2022 at 02:16, Matthias J. Sax <
mj...@apache.org>
wrote:

It probably deserves its own thread to start
discussing
ideas.

Yes. My question was: if we think it's time to do
a DSL
2.0,
should we
drop this KIP and just fix via DSL 2.0 instead?


Good question. Would love to hear what others think
about
this.

I've stated my position about this here:

For this KIP specifically, I think about it as a
continuation
from
KIP-478. Therefore, it could make sense to have it
as part of
the current
version of the DSL.

I'd even add that if this KIP is adopted, I would
not be that
disappointed
if KIP-634 is dropped in favor of a DSL v2.0 as the
access to
headers
provided by KIP-478's via Record API is much better
than
previous
`.context().headers()`.

But happy to reconsider if there is an agreement to
focus
efforts
towards a
DSL 2.0.


You're right. I'm not proposing the method
signature.

What signature do you propose? I don't see an
update on the
KIP.

My bad. I have clarified this in the KIP's public
interfaces
now:

```

New methods:

       - KStream<K,VOut>
KStream#processValues(ProcessorSupplier<K,
V, K,
VOut>
       processorSupplier, String... stateStoreNames)
       - KStream<K,VOut>
KStream#processValues(ProcessorSupplier<K,
V, K,
VOut>
       processorSupplier, Named named, String...
stateStoreNames)

Modified methods:

       - KStream<KOut,VOut>
KStream#process(ProcessorSupplier<K,
V,
KOut,
VOut>
       processorSupplier, String... stateStoreNames)
       - KStream<KOut,VOut>
KStream#process(ProcessorSupplier<K,
V,
KOut,
VOut>
       processorSupplier, Named named, String...
stateStoreNames)

```



Not sure if I understand how this would look
like. Do you
mean
checking
it
on the Record itself or somewhere else?

@Guozhang: I am not worried about the runtime
overhead. I
am
worries
about user experience. It's not clear from the
method
signature, that
you are not allowed to change the key, what seems
to be bad
API desig.
Even if I understand the desire to keep the API
surface
ares
small -- I
would rather have a compile time enforcement than
a runtime
check.

For example, we have `map()` and `mapValues()`
and
`mapValues()` returns
a `Value V` (enforces that that key is not
changes) instead
of
a
`KeyValue<KIn,VOut>` and we use a runtime check
to
check
that
the key is
not changed.

Naively, could we enforce something similar by
setting the
output key
type as `Void`.

       KStream#processValue(ProcessorSupplier<K, V,
Void,
VOut>
processorSupplier)

Not sure if this would work or not?

Or it might be worth to add a new interface,
`ValueProcessorSupplier`
that ensures that the key is not modified?


This is an important discussion, even more so with
a
DSL
v2.0.

At the moment, the DSL just flags whether
partitioning is
required based
on
the DSL operation. As mentioned, `mapValues()`
enforces only
the
value
has
changed through the DSL, though the only
_guarantee_
we have
is
that
Kafka
Streams "owns" the implementation, and we can flag
this
properly.

With a hypothetical v2.0 based on Record API, this
will be
harder to
enforce with the current APIs. e.g. with
`mapValues(Record<K,
V>
record)`,
nothing would stop users from using
`record.withKey("needs_partitioning")`.

The approach defined on this KIP is similar to what
we have
at
the moment
on `ValueTransformer*` where it validates at
runtime
that the
users are
not
calling `forward` with
`ForwardingDisabledProcessorContext`.
`ValueProcessorSupplier` is not meant to be a
public
API.
Only
to be used
internally on `processValues` implementation.

At first,
`KStream#processValue(ProcessorSupplier<K,
V, Void,
VOut>
processorSupplier)` won't work as it will require
the
`Processor`
implementation to actually change the key. Will
take
a deeper
look to
validate if this could solve this issue.




-Matthias


On 2/17/22 10:56 AM, Guozhang Wang wrote:
Regarding the last question Matthias had, I
wonder if
it's
similar to
my
first email's point 2) above? I think the
rationale is
that,
since
reference checks are relatively very cheap, it
is
worthwhile
to pay
this
extra runtime checks and in return to have a
single
consolidated
ProcessorSupplier programming interface (i.e.
we
would
eventually
deprecate ValueTransformerWithKeySupplier).

On Wed, Feb 16, 2022 at 10:57 AM Jorge Esteban
Quilcate
Otoya <
quilcate.jo...@gmail.com> wrote:

Thank you Matthias, this is great feedback.

Adding my comments below.

On Wed, 16 Feb 2022 at 00:42, Matthias J.
Sax <
mj...@apache.org>
wrote:

Thanks for the KIP.

In alignment to my reply to KIP-634, I am
wondering
if
we are
heading
into the right direction, or if we should
consider to
re-design the
DSL
from scratch?


I'm very excited about the idea of a DLS
v2.0.
It
probably
deserves
its
own
thread to start discussing ideas.

For this KIP specifically, I think about it
as
a
continuation from
KIP-478.
Therefore, it could make sense to have it as
part of
the
current
version of
the DSL.



Even if we don't do a DSL 2.0 right now, I
have some
concerns about
this
KIP:

(1) I am not sure if the propose changed is
backward
compatible? We
currently have:

        void
KStream#process(ProcessorSupplier,
String...)

The newly proposed method:

        KStream
KStream#process(ProcessorSupplier)

seems to be an incompatible change?

The KIP states:

Modified method KStream#process should be
compatible
with previous
version, that at the moment is fixed to a
Void return
type.

Why is it backward compatible? Having both
old and
new
#process()
seems
not to be compatible to me? Or are you
proposing to
_change_ the
method
signature (if yes, the `String...`
parameter
to add a
state store
seems
to be missing)? For this case, it seems
that
existing
programs
would at
least need to be recompiled -- it would
only
be a
source
compatible
change, but not a binary compatible change?


You're right. I'm not proposing the method
signature.
Totally agree about compatibility issue. I
was
only
considering
source
compatibility and was ignorant that changing
from void
to
a specific
type
would break binary compatibility.
I will update the KIP to reflect this:

Modifications to method KStream#process are
source
compatible with
previous version, though not binary
compatible.
Therefore
will
require
users to recompile their applications with
the
latest
version.


I am also wondering if/how this change
related to
KIP-401:










https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756

      From a high level it might not
conflict,
but I
wanted
to double
check?


Wasn't aware of this KIP, thanks for sharing!
I don't
think there is
conflict between KIPs, as far as I
understand.



For `KStream#processValues()`, my main
concern is the
added runtime
check if the key was modified or not -- it
seems to
provide bad user
experience -- enforcing that the key is not
modified
on
an API
level,
would seem to be much better.

Last, what is the purpose of
`setRecordKey()` and
`clearRecordKey()`? I
am not sure if I understand their purpose?


Both methods set/clear the context (current
key) to be
used when
checking
keys on forward(record) implementation.

enforcing that the key is not modified on
an
API
level,
would seem
to
be
much better.

Not sure if I understand how this would look
like. Do
you
mean
checking
it
on the Record itself or somewhere else?



-Matthias


On 2/15/22 11:53 AM, John Roesler wrote:
My apologies, this feedback was intended
for
KIP-634.
-John

On Tue, Feb 15, 2022, at 13:15, John
Roesler wrote:
Thanks for the update, Jorge,

I've just looked over the KIP again.
Just one
more
small
concern:

5) We can't just change the type of
Record#headers()
to a
new fully qualified type. That would be
a source-
incompatible breaking change for users.

Out options are:
* Deprecate the existing method and
create a new
one
with
the new type
* If the existing Headers is "not great
but ok",
then maybe
we leave it alone.

Thanks,
-John

On Mon, 2022-02-14 at 13:58 -0600, Paul
Whalen
wrote:
No specific comments, but I just
wanted to
mention
I like the
direction of
the KIP.  My team is a big user of
"transform"
methods because of
the
ability to chain them, and I have
always found
the
terminology
challenging
to explain alongside "process".  It
felt like
one
concept with
two
names.
So moving towards a single API that
is
powerful
enough to handle
both
use
cases seems absolutely correct to me.

Paul

On Mon, Feb 14, 2022 at 1:12 PM Jorge
Esteban
Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

Got it. Thanks John, this make
sense.

I've updated the KIP to include the
deprecation
of:

         - KStream#transform
         - KStream#transformValues
         - KStream#flatTransform
         -
KStream#flatTransformValues



On Fri, 11 Feb 2022 at 15:16, John
Roesler <
vvcep...@apache.org

wrote:

Thanks, Jorge!

I think it’ll be better to keep
this KIP
focused on KStream
methods
only.
I suspect that the KTable methods
may be
more
complicated than
just
that
proposed replacement, but it’ll
also be
easier
to consider that
question
in
isolation.

The nice thing about just
deprecating the
KStream methods and
not
the
Transform* interfaces is that you
can keep
your proposal just
scoped
to
KStream and not have any
consequences for
the
rest of the DSL.

Thanks again,
John

On Fri, Feb 11, 2022, at 06:43,
Jorge
Esteban
Quilcate Otoya
wrote:
Thanks, John.

4) I agree that we shouldn't
deprecate
the
Transformer*
classes, but do you think we
should
deprecate the
KStream#transform* methods? I'm
curious
if
there's any
remaining reason to have those
methods,
or
if your KIP
completely obviates them.

Good catch.
I considered that deprecating
`Transformer*`
and `transform*`
would
go
hand
in hand — maybe it happened
similarly
with
old `Processor` and
`process`?
Though deprecating only
`transform*`
operations could be a
better
signal
for users than non deprecating
anything
at
all and pave the
way
to
it's
deprecation.

Should this deprecation also
consider
including
`KTable#transformValues`?
The approach proposed on the
KIP:

`ktable.toStream().processValues().toTable()` seems fair to
me,
though
I
will have to test it further.

I'm happy to update the KIP if
there's
some
consensus around
this.
Will add the deprecation notes
these days
and wait for any
additional
feedback on this topic before
wrapping up
the KIP.


On Fri, 11 Feb 2022 at 04:03,
John
Roesler
<
vvcep...@apache.org>
wrote:

Thanks for the update, Jorge!

I just read over the KIP
again, and I'm
in
support. One more
question came up for me,
though:

4) I agree that we shouldn't
deprecate
the
Transformer*
classes, but do you think we
should
deprecate the
KStream#transform* methods?
I'm curious
if
there's any
remaining reason to have
those
methods,
or
if your KIP
completely obviates them.

Thanks,
-John

On Thu, 2022-02-10 at 21:32
+0000,
Jorge
Esteban Quilcate
Otoya wrote:
Thank you both for your
feedback!

I have added the following
note on
punctuation:

```
NOTE: The key validation
can
be
defined
when processing the
message.
Though, with punctuations
it
won't be
possible to define the
key
for
validation before
forwarding,
therefore
it won't be
possible to
forward
from punctuation.
This is similar behavior to
how
`ValueTransformer`s behave
at
the
moment.
```

Also make it explicit also
that we
are
going to apply
referencial
equality
for key validation.

I hope this is covering all
your
feedback, let me know if
I'm
missing
anything.

Cheers,
Jorge.

On Wed, 9 Feb 2022 at
22:19,
Guozhang
Wang <
wangg...@gmail.com

wrote:

I'm +1 on John's point 3)
for
punctuations.

And I think if people are
on the
same
page that a reference
equality
check
per record is not a huge
overhead,
I
think doing that
enforcement
is
better
than documentations and
hand-wavy
undefined behaviors.


Guozhang

On Wed, Feb 9, 2022 at
11:27 AM
John
Roesler <
vvcep...@apache.org

wrote:

Thanks for the KIP
Jorge,

I'm in support of your
proposal.

1)
I do agree with
Guozhang's point
(1). I think the cleanest
approach. I think it's
cleaner
and
better to keep the
enforcement internal to
the
framework than to introduce a
public API or context
wrapper for
processors to use
explicitly.

2) I tend to agree with
you on
this
one; I think the
equality check ought to
be fast
enough in practice.

3) I think this is
implicit, but
should be explicit in the
KIP: For the
`processValues` API,
because the framework
sets
the key on the context
before
calling `process` and then
unsets it afterwards,
there will
always be no key set
during
task puctuation.
Therefore, while
processors may still
register punctuators,
they will
not
be able to forward
anything from them.

This is functionally
equivalent
to
the existing
transformers, by the
way, that
are
also forbidden to
forward
anything during
punctuation.

For what it's worth, I
think this
is
the best tradeoff.

The only alternative I
see is not
to
place any restriction
on forwarded keys at
all
and just
document that if users
don't maintain proper
partitioning,
they'll get undefined
behavior. That might be
more
powerful, but it's also a
usability problem.

Thanks,
-John

On Wed, 2022-02-09 at
11:34
+0000,
Jorge Esteban Quilcate
Otoya wrote:
Thanks Guozhang.

Does
`ValueProcessorContext`
have to be a public API? It
seems
to me
that this can be
completely
abstracted away from user
interfaces
as an
internal class

Totally agree. No
intention to
add
these as public APIs.
Will
update
the
KIP to reflect this.

in the past the
rationale for
enforcing it at the
interface layer
rather
than do
runtime checks is that it
is
more
efficient.
I'm not sure how
much
overhead
it may incur to check if
the
key
did
not
change: if it is
just a
reference
equality check maybe
it's
okay.
What's
your take on this?

Agree, reference
equality
should
cover this validation
and
the
overhead
impact should not be
meaningful.
Will update the KIP
to
reflect
this as well.


On Tue, 8 Feb 2022 at
19:05,
Guozhang Wang <
wangg...@gmail.com>
wrote:

Hello Jorge,

Thanks for bringing
this
KIP! I
think this is a nice
idea
to
consider
using
a single overloaded
function
name for #process, just a
couple
quick
questions after
reading the
proposal:

1) Does
`ValueProcessorContext`
have to be a public
API? It
seems to
me
that this can be
completely
abstracted away from user
interfaces
as
an
internal class, and
we call
the
`setKey` before calling
user-instantiated
`process` function,
and then
in
its overridden
`forward` it
can
just
check
if the key changes
or not.
2) Related to 1)
above, in
the
past the rationale for
enforcing
it at
the
interface layer
rather than
do
runtime checks is that
it is
more
efficient.
I'm not sure how
much
overhead
it may incur to check if
the
key
did
not
change: if it is
just a
reference equality check maybe
it's
okay.
What's
your take on this?


Guozhang

On Tue, Feb 8, 2022
at 5:17
AM
Jorge Esteban Quilcate
Otoya
<

quilcate.jo...@gmail.com>
wrote:

Hi Dev team,

I'd like to start
a new
discussion thread on Kafka
Streams
KIP-820:


















https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API

This KIP is aimed
to extend
the current
`KStream#process`
API
to
return
output values
that
could be
chained across the
topology,
as
well as
introducing a new
`KStream#processValues` to use
processor
while
validating
keys haven't
change and
repartition is not required.

Looking forward
to
your
feedback.

Regards,
Jorge.



--
-- Guozhang




--
-- Guozhang



















--
-- Guozhang










Reply via email to