Hi Damian,

Thanks for comments.
About overrides, what other alternatives do we have? For
backwards-compatibility we have to add extra methods to the existing ones.

About ProcessorContext vs RecordContext, you are right. I think I need to
implement a prototype to understand the full picture as some parts of the
KIP might not be as straightforward as I thought.


Cheers,
Jeyhun

On Wed, Jul 5, 2017 at 10:40 AM Damian Guy <damian....@gmail.com> wrote:

> HI Jeyhun,
>
> Is the intention that these methods are new overloads on the KStream,
> KTable, etc?
>
> It is worth noting that a ProcessorContext is not a RecordContext. A
> RecordContext, as it stands, only exists during the processing of a single
> record. Whereas the ProcessorContext exists for the lifetime of the
> Processor. Sot it doesn't make sense to cast a ProcessorContext to a
> RecordContext.
> You mentioned above passing the InternalProcessorContext to the init()
> calls. It is internal for a reason and i think it should remain that way.
> It might be better to move the recordContext() method from
> InternalProcessorContext to ProcessorContext.
>
> In the KIP you have an example showing:
> richMapper.init((RecordContext) processorContext);
> But the interface is:
> public interface RichValueMapper<V, VR> {
>     VR apply(final V value, final RecordContext recordContext);
> }
> i.e., there is no init(...), besides as above this wouldn't make sense.
>
> Thanks,
> Damian
>
> On Tue, 4 Jul 2017 at 23:30 Jeyhun Karimov <je.kari...@gmail.com> wrote:
>
> > Hi Matthias,
> >
> > Actually my intend was to provide to RichInitializer and later on we
> could
> > provide the context of the record as you also mentioned.
> > I remove that not to confuse the users.
> > Regarding the RecordContext and ProcessorContext interfaces, I just
> > realized the InternalProcessorContext class. Can't we pass this as a
> > parameter to init() method of processors? Then we would be able to get
> > RecordContext easily with just a method call.
> >
> >
> > Cheers,
> > Jeyhun
> >
> > On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > One more thing:
> > >
> > > I don't think `RichInitializer` does make sense. As we don't have any
> > > input record, there is also no context. We could of course provide the
> > > context of the record that triggers the init call, but this seems to be
> > > semantically questionable. Also, the context for this first record will
> > > be provided by the consecutive call to aggregate anyways.
> > >
> > >
> > > -Matthias
> > >
> > > On 6/29/17 1:11 PM, Matthias J. Sax wrote:
> > > > Thanks for updating the KIP.
> > > >
> > > > I have one concern with regard to backward compatibility. You suggest
> > to
> > > > use RecrodContext as base interface for ProcessorContext. This will
> > > > break compatibility.
> > > >
> > > > I think, we should just have two independent interfaces. Our own
> > > > ProcessorContextImpl class would implement both. This allows us to
> cast
> > > > it to `RecordContext` and thus limit the visible scope.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 6/27/17 1:35 PM, Jeyhun Karimov wrote:
> > > >> Hi all,
> > > >>
> > > >> I updated the KIP w.r.t. discussion and comments.
> > > >> Basically I eliminated overloads for particular method if they are
> > more
> > > >> than 3.
> > > >> As we can see there are a lot of overloads (and more will come with
> > > KIP-149
> > > >> :) )
> > > >> So, is it wise to
> > > >> wait the result of constructive DSL thread or
> > > >> extend KIP to address this issue as well or
> > > >> continue as it is?
> > > >>
> > > >> Cheers,
> > > >> Jeyhun
> > > >>
> > > >> On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >>
> > > >>> LGTM. Thanks!
> > > >>>
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov <
> > je.kari...@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>>> Thanks for the comment Matthias. After all the discussion (thanks
> to
> > > all
> > > >>>> participants), I think this (single method that passes in a
> > > RecordContext
> > > >>>> object) is the best alternative.
> > > >>>> Just a side note: I think KAFKA-3907 [1] can also be integrated
> into
> > > the
> > > >>>> KIP by adding related method inside RecordContext interface.
> > > >>>>
> > > >>>>
> > > >>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907
> > > >>>>
> > > >>>>
> > > >>>> Cheers,
> > > >>>> Jeyhun
> > > >>>>
> > > >>>> On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <
> > > matth...@confluent.io>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>>
> > > >>>>> I would like to push this discussion further. It seems we got
> nice
> > > >>>>> alternatives (thanks for the summary Jeyhun!).
> > > >>>>>
> > > >>>>> With respect to RichFunctions and allowing them to be stateful, I
> > > have
> > > >>>>> my doubt as expressed already. From my understanding, the idea
> was
> > to
> > > >>>>> give access to record metadata information only. If you want to
> do
> > a
> > > >>>>> stateful computation you should rather use #transform().
> > > >>>>>
> > > >>>>> Furthermore, as pointed out, we would need to switch to a
> > > >>>>> supplier-pattern introducing many more overloads.
> > > >>>>>
> > > >>>>> For those reason, I advocate for a simple interface with a single
> > > >>> method
> > > >>>>> that passes in a RecordContext object.
> > > >>>>>
> > > >>>>>
> > > >>>>> -Matthias
> > > >>>>>
> > > >>>>>
> > > >>>>> On 6/6/17 5:15 PM, Guozhang Wang wrote:
> > > >>>>>> Thanks for the comprehensive summary!
> > > >>>>>>
> > > >>>>>> Personally I'd prefer the option of passing RecordContext as an
> > > >>>>> additional
> > > >>>>>> parameter into he overloaded function. But I'm also open to
> other
> > > >>>>> arguments
> > > >>>>>> if there are sth. that I have overlooked.
> > > >>>>>>
> > > >>>>>> Guozhang
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <
> > > je.kari...@gmail.com
> > > >>>>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi,
> > > >>>>>>>
> > > >>>>>>> Thanks for your comments Matthias and Guozhang.
> > > >>>>>>>
> > > >>>>>>> Below I mention the quick summary of the main alternatives we
> > > looked
> > > >>>> at
> > > >>>>> to
> > > >>>>>>> introduce the Rich functions (I will refer to it as Rich
> > functions
> > > >>>>> until we
> > > >>>>>>> find better/another name). Initially the proposed alternatives
> > was
> > > >>> not
> > > >>>>>>> backwards-compatible, so I will not mention them.
> > > >>>>>>> The related discussions are spread in KIP-149 and in this KIP
> > > >>>> (KIP-159)
> > > >>>>>>> discussion threads.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 1. The idea of rich functions came into the stage with KIP-149,
> > in
> > > >>>>>>> discussion thread. As a result we extended KIP-149 to support
> > Rich
> > > >>>>>>> functions as well.
> > > >>>>>>>
> > > >>>>>>> 2.  To as part of the Rich functions, we provided init
> > > >>>>> (ProcessorContext)
> > > >>>>>>> method. Afterwards, Dammian suggested that we should not
> provide
> > > >>>>>>> ProcessorContext to users. As a result, we separated the two
> > > >>> problems
> > > >>>>> into
> > > >>>>>>> two separate KIPs, as it seems they can be solved in parallel.
> > > >>>>>>>
> > > >>>>>>> - One approach we considered was :
> > > >>>>>>>
> > > >>>>>>> public interface ValueMapperWithKey<K, V, VR> {
> > > >>>>>>>     VR apply(final K key, final V value);
> > > >>>>>>> }
> > > >>>>>>>
> > > >>>>>>> public interface RichValueMapper<K, V, VR> extends
> RichFunction{
> > > >>>>>>> }
> > > >>>>>>>
> > > >>>>>>> public interface RichFunction {
> > > >>>>>>>     void init(RecordContext recordContext);
> > > >>>>>>>     void close();
> > > >>>>>>> }
> > > >>>>>>>
> > > >>>>>>> public interface RecordContext {
> > > >>>>>>>     String applicationId();
> > > >>>>>>>     TaskId taskId();
> > > >>>>>>>     StreamsMetrics metrics();
> > > >>>>>>>     String topic();
> > > >>>>>>>     int partition();
> > > >>>>>>>     long offset();
> > > >>>>>>>     long timestamp();
> > > >>>>>>>     Map<String, Object> appConfigs();
> > > >>>>>>>     Map<String, Object> appConfigsWithPrefix(String prefix);
> > > >>>>>>> }
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> public interface ProcessorContext extends RecordContext {
> > > >>>>>>>    // all methods but the ones in RecordContext
> > > >>>>>>> }
> > > >>>>>>>
> > > >>>>>>> As a result:
> > > >>>>>>> * . All "withKey" and "withoutKey" interfaces can be converted
> to
> > > >>>> their
> > > >>>>>>> Rich counterparts (with empty init() and close() methods)
> > > >>>>>>> *. All related Processors will accept Rich interfaces in their
> > > >>>>>>> constructors.
> > > >>>>>>> *. So, we convert the related "withKey" or "withoutKey"
> > interfaces
> > > >>> to
> > > >>>>> Rich
> > > >>>>>>> interface while building the topology and initialize the
> related
> > > >>>>> processors
> > > >>>>>>> with Rich interfaces only.
> > > >>>>>>> *. We will not need to overloaded methods for rich functions as
> > > Rich
> > > >>>>>>> interfaces extend withKey interfaces. We will just check the
> > object
> > > >>>> type
> > > >>>>>>> and act accordingly.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 3. There was some thoughts that the above approach does not
> > support
> > > >>>>> lambdas
> > > >>>>>>> so we should support only one method, only init(RecordContext),
> > as
> > > >>>> part
> > > >>>>> of
> > > >>>>>>> Rich interfaces.
> > > >>>>>>> This is still in discussion. Personally I think Rich interfaces
> > are
> > > >>> by
> > > >>>>>>> definition lambda-free and we should not care much about it.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 4. Thanks to Matthias's discussion, an alternative we
> considered
> > > was
> > > >>>> to
> > > >>>>>>> pass in the RecordContext as method parameter.  This might even
> > > >>> allow
> > > >>>> to
> > > >>>>>>> use Lambdas and we could keep the name RichFunction as we
> > preserve
> > > >>> the
> > > >>>>>>> nature of being a function.
> > > >>>>>>> "If you go with `init()` and `close()` we basically
> > > >>>>>>> allow users to have an in-memory state for a function. Thus, we
> > > >>> cannot
> > > >>>>>>> share a single instance of RichValueMapper (etc) over multiple
> > > tasks
> > > >>>> and
> > > >>>>>>> we would need a supplier pattern similar to #transform(). And
> > this
> > > >>>> would
> > > >>>>>>> "break the flow" of the API, as (Rich)ValueMapperSupplier would
> > not
> > > >>>>>>> inherit from ValueMapper and thus we would need many new
> overload
> > > >>> for
> > > >>>>>>> KStream/KTable classes". (Copy paste from Matthias's email)
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Cheers,
> > > >>>>>>> Jeyhun
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax <
> > > >>> matth...@confluent.io
> > > >>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Yes, we did consider this, and there is no consensus yet what
> > the
> > > >>>> best
> > > >>>>>>>> alternative is.
> > > >>>>>>>>
> > > >>>>>>>> @Jeyhun: the email thread got pretty long. Maybe you can give
> a
> > > >>> quick
> > > >>>>>>>> summary of the current state of the discussion?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> -Matthias
> > > >>>>>>>>
> > > >>>>>>>> On 6/4/17 6:04 PM, Guozhang Wang wrote:
> > > >>>>>>>>> Thanks for the explanation Jeyhun and Matthias.
> > > >>>>>>>>>
> > > >>>>>>>>> I have just read through both KIP-149 and KIP-159 and am
> > > wondering
> > > >>>> if
> > > >>>>>>> you
> > > >>>>>>>>> guys have considered a slight different approach for rich
> > > >>> function,
> > > >>>>>>> that
> > > >>>>>>>> is
> > > >>>>>>>>> to add the `RecordContext` into the apply functions as an
> > > >>> additional
> > > >>>>>>>>> parameter. For example:
> > > >>>>>>>>>
> > > >>>>>>>>> ---------------------------
> > > >>>>>>>>>
> > > >>>>>>>>> interface RichValueMapper<V, VR> {
> > > >>>>>>>>>
> > > >>>>>>>>> VR apply(final V value, final RecordContext context);
> > > >>>>>>>>>
> > > >>>>>>>>> }
> > > >>>>>>>>>
> > > >>>>>>>>> ...
> > > >>>>>>>>>
> > > >>>>>>>>> // then in KStreams
> > > >>>>>>>>>
> > > >>>>>>>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ?
> extends
> > > VR>
> > > >>>>>>>> mapper);
> > > >>>>>>>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper <?
> > super
> > > >>>> V, ?
> > > >>>>>>>>> extends VR> mapper);
> > > >>>>>>>>>
> > > >>>>>>>>> -------------------------------
> > > >>>>>>>>>
> > > >>>>>>>>> The caveat is that it will introduces more overloads; but I
> > think
> > > >>>> the
> > > >>>>>>>>> #.overloads are mainly introduced by 1) serde overrides and
> 2)
> > > >>>>>>>>> state-store-supplier overides, both of which can be reduced
> in
> > > the
> > > >>>>> near
> > > >>>>>>>>> future, and I felt this overloading is still worthwhile, as
> it
> > > has
> > > >>>> the
> > > >>>>>>>>> following benefits:
> > > >>>>>>>>>
> > > >>>>>>>>> 1) still allow lambda expressions.
> > > >>>>>>>>> 2) clearer code path (do not need to "convert" from non-rich
> > > >>>> functions
> > > >>>>>>> to
> > > >>>>>>>>> rich functions)
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> Maybe this approach has already been discussed and I may have
> > > >>>>>>> overlooked
> > > >>>>>>>> in
> > > >>>>>>>>> the email thread; anyways, lmk.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> Guozhang
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax <
> > > >>>>>>> matth...@confluent.io>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> I agree with Jeyhun. As already mention, the overall API
> > > >>>> improvement
> > > >>>>>>>>>> ideas are overlapping and/or contradicting each other. For
> > this
> > > >>>>>>> reason,
> > > >>>>>>>>>> not all ideas can be accomplished and some Jira might just
> be
> > > >>>> closed
> > > >>>>>>> as
> > > >>>>>>>>>> "won't fix".
> > > >>>>>>>>>>
> > > >>>>>>>>>> For this reason, we try to do those KIP discussion with are
> > > large
> > > >>>>>>> scope
> > > >>>>>>>>>> to get an overall picture to converge to an overall
> consisted
> > > >>> API.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> @Jeyhun: about the overloads. Yes, we might get more
> overload.
> > > It
> > > >>>>>>> might
> > > >>>>>>>>>> be sufficient though, to do a single xxxWithContext()
> overload
> > > >>> that
> > > >>>>>>> will
> > > >>>>>>>>>> provide key+value+context. Otherwise, if might get too messy
> > > >>> having
> > > >>>>>>>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext,
> > > >>>>>>>>>> ValueMapperWithKeyWithContext.
> > > >>>>>>>>>>
> > > >>>>>>>>>> On the other hand, we also have the "builder pattern" idea
> as
> > an
> > > >>>> API
> > > >>>>>>>>>> change and this might mitigate the overload problem. Not for
> > > >>> simple
> > > >>>>>>>>>> function like map/flatMap etc but for joins and
> aggregations.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On the other hand, as I mentioned in an older email, I am
> > > >>>> personally
> > > >>>>>>>>>> fine to break the pure functional interface, and add
> > > >>>>>>>>>>
> > > >>>>>>>>>>   - interface WithRecordContext with method
> > > `open(RecordContext)`
> > > >>>> (or
> > > >>>>>>>>>> `init(...)`, or any better name) -- but not `close()`)
> > > >>>>>>>>>>
> > > >>>>>>>>>>   - interface ValueMapperWithRecordContext extends
> > ValueMapper,
> > > >>>>>>>>>> WithRecordContext
> > > >>>>>>>>>>
> > > >>>>>>>>>> This would allow us to avoid any overload. Of course, we
> don't
> > > >>> get
> > > >>>> a
> > > >>>>>>>>>> "pure function" interface and also sacrifices Lambdas.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> I am personally a little bit undecided what the better
> option
> > > >>> might
> > > >>>>>>> be.
> > > >>>>>>>>>> Curious to hear what other think about this trade off.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> -Matthias
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote:
> > > >>>>>>>>>>> Hi Guozhang,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> It subsumes partially. Initially the idea was to support
> > > >>>>>>> RichFunctions
> > > >>>>>>>>>> as a
> > > >>>>>>>>>>> separate interface. Throughout the discussion, however, we
> > > >>>>> considered
> > > >>>>>>>>>> maybe
> > > >>>>>>>>>>> overloading the related methods (with RecodContext param)
> is
> > > >>>> better
> > > >>>>>>>>>>> approach than providing a separate RichFunction interface.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Cheers,
> > > >>>>>>>>>>> Jeyhun
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang <
> > > >>> wangg...@gmail.com>
> > > >>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Does this KIP subsume this ticket as well?
> > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov <
> > > >>>>>>> je.kari...@gmail.com
> > > >>>>>>>>>
> > > >>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Dear community,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would
> > like
> > > >>> to
> > > >>>>>>>>>> initiate
> > > >>>>>>>>>>>>> KIP for rich functions (interfaces) [2].
> > > >>>>>>>>>>>>> I would like to get your comments.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> [1]
> > > >>>>>>>>>>>>>
> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=
> > > >>>>>>>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+
> > > >>>>>>>>>> ValueTransformer+ValueMapper+
> > > >>>>>>>>>>>>> and+ValueJoiner
> > > >>>>>>>>>>>>> [2]
> > > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Cheers,
> > > >>>>>>>>>>>>> Jeyhun
> > > >>>>>>>>>>>>> --
> > > >>>>>>>>>>>>> -Cheers
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Jeyhun
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> --
> > > >>>>>>>>>>>> -- Guozhang
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> --
> > > >>>>>>> -Cheers
> > > >>>>>>>
> > > >>>>>>> Jeyhun
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>> --
> > > >>>> -Cheers
> > > >>>>
> > > >>>> Jeyhun
> > > >>>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> -- Guozhang
> > > >>>
> > > >
> > >
> > > --
> > -Cheers
> >
> > Jeyhun
> >
>
-- 
-Cheers

Jeyhun

Reply via email to