Hi, Jay and all,

Thanks for all your quick responses. I tried to summarize my thoughts here:

- ConsumerRecord as stream processor API:

   * This KafkaProcessor API is targeted to receive the message from Kafka.
So, to Yasuhiro's join/transformation example, any join/transformation
results that are materialized in Kafka should have ConsumerRecord format
(i.e. w/ topic and offsets). Any non-materialized join/transformation
results should not be processed by this KafkaProcessor API. One example is
the in-memory operators API in Samza, which is designed to handle the
non-materialzied join/transformation results. And yes, in this case, a more
abstract data model is needed.

   * Just to support Jay's point of a general
ConsumerRecord/ProducerRecord, a general stream processing on more than one
data sources would need at least the following info: data source
description (i.e. which topic/table), and actual data (i.e. key-value
pairs). It would make sense to have the data source name as part of the
general metadata in stream processing (think about it as the table name for
records in standard SQL).

- SQL/DSL

   * I think that this topic itself is worthy of another KIP discussion. I
would prefer to leave it out of scope in KIP-28.

- Client-side pluggable partition manager

   * Given the use cases we have seen with large-scale deployment of
Samza/Kafka in LinkedIn, I would argue that we should make it as the
first-class citizen in this KIP. The use cases include:

      * multi-cluster Kafka

      * host-affinity (i.e. local-state associated w/ certain partitions on
client)

- Multi-cluster scenario

   * Although I originally just brought it up as a use case that requires
client-side partition manager, reading Jay’s comments, I realized that I
have one fundamental issue w/ the current copycat + transformation model.
If I interpret Jay’s comment correctly, the proposed copycat+transformation
plays out in the following way: i) copycat takes all data from sources (no
matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii)
transformation is only restricted to take data sources in *this single
Kafka cluster* to perform aggregate/join etc. This is different from my
original understanding of the copycat. The main issue I have with this
model is: huge data-copy between Kafka clusters. In LinkedIn, we used to
follow this model that uses MirrorMaker to map topics from tracking
clusters to Samza-specific Kafka cluster and only do stream processing in
the Samza-specific Kafka cluster. We moved away from this model and started
allowing users to directly consume from tracking Kafka clusters due to the
overhead of copying huge amount of traffic between Kafka clusters. I agree
that the initial design of KIP-28 would probably need a smaller scope of
problem to solve, hence, limiting to solving partition management in a
single cluster. However, I would really hope the design won’t prevent the
use case of processing data directly from multiple clusters. In my opinion,
making the partition manager as a client-side pluggable logic would allow
us to achieve these goals.

Thanks a lot in advance!

-Yi

On Fri, Jul 24, 2015 at 11:13 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey Yi,
>
> For your other two points:
>
> - This definitely doesn't cover any kind of SQL or anything like this.
>
> - The prototype we started with just had process() as a method but Yasuhiro
> had some ideas of adding additional filter/aggregate convenience methods.
> We should discuss how this would fit with the operator work you were doing
> in Samza. Probably the best way is just get the code out there in current
> state and start talking about it?
>
> - Your point about multiple clusters. We actually have a proposed extension
> for the Kafka group management protocol that would allow it to cover
> multiple clusters but actually I think that use case is not the focus. I
> think in scope would be consuming from one cluster and producing to
> another.
>
> One of the assumptions we are making is that we will split into two
> categories:
> a. Ingress/egress which is handled by copycat
> b. Transformation which would be handled by this api
>
> I think there are a number of motivations for this
> - It is really hard to provide hard guarantees if you allow non-trivial
> aggregation coupled with the ingress/egress. So if you want to be able to
> do something that provides a kind of end-to-end "exactly once" guarantee
> (that's not really the right term but what people use) I think it will be
> really hard to do this across multiple systems (hello two-phase commit)
> - The APIs for ingest/egress end up needing to be really different for a
> first-class ingestion framework
>
> So the case where you have data coming from many systems including many
> Kafka clusters is just about how easy/hard it is to use copycat with the
> transformer api in the same program. I think this is something we should
> work out as part of the prototyping.
>
> -Jay
>
> On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan <nickpa...@gmail.com> wrote:
>
> > Hi, Guozhang,
> >
> > Thanks for starting this. I took a quick look and had the following
> > thoughts to share:
> >
> > - In the proposed KafkaProcessor API, there is no interface like
> Collector
> > that allows users to send messages to. Why is that? Is the idea to
> > initialize the producer once and re-use it in the processor? And if there
> > are many KStreamThreads in the process, are there going to be many
> > instances of KafkaProducer although all outputs are sending to the same
> > Kafka cluster?
> >
> > - Won’t it be simpler if the process() API just takes in the
> ConsumerRecord
> > as the input instead of a tuple of (topic, key, value)?
> >
> > - Also, the input only indicates the topic of a message. What if the
> stream
> > task needs to consume and produce messages from/to multiple Kafka
> clusters?
> > To support that case, there should be a system/cluster name in both input
> > and output as well.
> >
> > - How are the output messages handled? There does not seem to have an
> > interface that allows user to send an output messages to multiple output
> > Kafka clusters.
> >
> > - It seems the proposed model also assumes one thread per processor. What
> > becomes thread-local and what are shared among processors? Is the
> proposed
> > model targeting to have the consumers/producers become thread-local
> > instances within each KafkaProcessor? What’s the cost associated with
> this
> > model?
> >
> > - One more important issue: how do we plug-in client-side partition
> > management logic? Considering about the use case where the stream task
> > needs to consume from multiple Kafka clusters, I am not even sure that we
> > can rely on Kafka broker to maintain the consumer group membership? Maybe
> > we still can get the per cluster consumer group membership and
> partitions.
> > However, in this case, we truly need a client-side plugin partition
> > management logic to determine how to assign partitions in different Kafka
> > clusters to consumers (i.e. consumers for cluster1.topic1.p1 and
> > cluster2.topic2.p1 has to be assigned together to one KafkaProcessor for
> > processing). Based on the full information about (group members, all
> topic
> > partitions) in all Kafka clusters with input topics, there should be two
> > levels of partition management policies: a) how to group all topic
> > partitions in all Kafka clusters to processor groups (i.e. the same
> concept
> > as Task group in Samza); b) how to assign the processor groups to group
> > members. Note if a processor group includes topic partitions from more
> than
> > one Kafka clusters, it has to be assigned to the common group members in
> > all relevant Kafka clusters. This can not be done just by the brokers in
> a
> > single Kafka cluster.
> >
> > - It seems that the intention of this KIP is also trying to put SQL/DSL
> > libraries into Kafka. Why is it? Shouldn't Kafka be more focused on
> hiding
> > system-level integration details and leave it open for any additional
> > modules outside the Kafka core to enrich the functionality that are
> > user-facing?
> >
> > Just a few quick cents. Thanks a lot!
> >
> > -Yi
> >
> > On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede <n...@confluent.io>
> wrote:
> >
> > > Ewen:
> > >
> > > * I think trivial filtering and aggregation on a single stream usually
> > work
> > > > fine with this model.
> > >
> > >
> > > The way I see this, the process() API is an abstraction for
> > > message-at-a-time computations. In the future, you could imagine
> > providing
> > > a simple DSL layer on top of the process() API that provides a set of
> > APIs
> > > for stream processing operations on sets of messages like joins,
> windows
> > > and various aggregations.
> > >
> > > * Spark (and presumably
> > > > spark streaming) is supposed to get a big win by handling shuffles
> such
> > > > that the data just stays in cache and never actually hits disk, or at
> > > least
> > > > hits disk in the background. Will we take a hit because we always
> write
> > > to
> > > > Kafka?
> > >
> > >
> > > The goal isn't so much about forcing materialization of intermediate
> > > results into Kafka but designing the API to integrate with Kafka to
> allow
> > > such materialization, wherever that might be required. The downside
> with
> > > other stream processing frameworks is that they have weak integration
> > with
> > > Kafka where interaction with Kafka is only at the endpoints of
> processing
> > > (first input, final output). Any intermediate operations that might
> > benefit
> > > from persisting intermediate results into Kafka are forced to be broken
> > up
> > > into 2 separate topologies/plans/stages of processing that lead to more
> > > jobs. The implication is that now the set of stream processing
> operations
> > > that should really have lived in one job per application is now split
> up
> > > across several piecemeal jobs that need to be monitored, managed and
> > > operated separately. The APIs should still allows in-memory storage of
> > > intermediate results where they make sense.
> > >
> > > Jiangjie,
> > >
> > > I just took a quick look at the KIP, is it very similar to mirror maker
> > > > with message handler?
> > >
> > >
> > > Not really. I wouldn't say it is similar, but mirror maker is a special
> > > instance of using copycat with Kafka source, sink + optionally the
> > > process() API. I can imagine replacing the MirrorMaker, in the due
> course
> > > of time, with copycat + process().
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Thu, Jul 23, 2015 at 11:32 PM, Jiangjie Qin
> <j...@linkedin.com.invalid
> > >
> > > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > I just took a quick look at the KIP, is it very similar to mirror
> maker
> > > > with message handler?
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava <
> > > e...@confluent.io
> > > > >
> > > > wrote:
> > > >
> > > > > Just some notes on the KIP doc itself:
> > > > >
> > > > > * It'd be useful to clarify at what point the plain consumer +
> custom
> > > > code
> > > > > + producer breaks down. I think trivial filtering and aggregation
> on
> > a
> > > > > single stream usually work fine with this model. Anything where you
> > > need
> > > > > more complex joins, windowing, etc. are where it breaks down. I
> think
> > > > most
> > > > > interesting applications require that functionality, but it's
> helpful
> > > to
> > > > > make this really clear in the motivation -- right now, Kafka only
> > > > provides
> > > > > the lowest level plumbing for stream processing applications, so
> most
> > > > > interesting apps require very heavyweight frameworks.
> > > > > * I think the feature comparison of plain producer/consumer, stream
> > > > > processing frameworks, and this new library is a good start, but we
> > > might
> > > > > want something more thorough and structured, like a feature matrix.
> > > Right
> > > > > now it's hard to figure out exactly how they relate to each other.
> > > > > * I'd personally push the library vs. framework story very strongly
> > --
> > > > the
> > > > > total buy-in and weak integration story of stream processing
> > frameworks
> > > > is
> > > > > a big downside and makes a library a really compelling (and
> currently
> > > > > unavailable, as far as I am aware) alternative.
> > > > > * Comment about in-memory storage of other frameworks is
> interesting
> > --
> > > > it
> > > > > is specific to the framework, but is supposed to also give
> > performance
> > > > > benefits. The high-level functional processing interface would
> allow
> > > for
> > > > > combining multiple operations when there's no shuffle, but when
> there
> > > is
> > > > a
> > > > > shuffle, we'll always be writing to Kafka, right? Spark (and
> > presumably
> > > > > spark streaming) is supposed to get a big win by handling shuffles
> > such
> > > > > that the data just stays in cache and never actually hits disk, or
> at
> > > > least
> > > > > hits disk in the background. Will we take a hit because we always
> > write
> > > > to
> > > > > Kafka?
> > > > > * I really struggled with the structure of the KIP template with
> > > Copycat
> > > > > because the flow doesn't work well for proposals like this. They
> > aren't
> > > > as
> > > > > concrete changes as the KIP template was designed for. I'd
> completely
> > > > > ignore that template in favor of optimizing for clarity if I were
> > you.
> > > > >
> > > > > -Ewen
> > > > >
> > > > > On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I just posted KIP-28: Add a transform client for data processing
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
> > > > > > >
> > > > > > .
> > > > > >
> > > > > > The wiki page does not yet have the full design / implementation
> > > > details,
> > > > > > and this email is to kick-off the conversation on whether we
> should
> > > add
> > > > > > this new client with the described motivations, and if yes what
> > > > features
> > > > > /
> > > > > > functionalities should be included.
> > > > > >
> > > > > > Looking forward to your feedback!
> > > > > >
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Thanks,
> > > > > Ewen
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>

Reply via email to