I added StreamsBuilder under the assumption that InternalStreamBuilder
would be required to merge
two streams. However, if that is not the case, then I would still need a
couple of things:

1) An InternalStreamBuilder instance to instantiate a new KStream

2) The merge_name that the merged streams will be given

3) Need access to the corresponding InternalStreamBuilder's
InternalTopologyBuilder to add a processor (for the new KStreams)

All these parameters are associated with InternalStreamsBuilder, thus it is
essential towards merging the streams.
We are left with three options (taking into account the restriction that
InternalStreamsBuilder's reference scope is mostly limited to within
the org.apache.kafka.streams.kstream.internals
package):

a) Find a way to pass InternalStreamsBuilder indirectly into the class.
(using StreamsBuilder)

b) Find the matching InternalStreamBuilder within the method that
corresponds to the streams about to be merged.

or c) Use the local InternalStreamsBuilder inherited from AbstractStream,
assuming that it is the correct builder

>From your suggestion, that would mean using the c option I mentioned
earlier. This choice of implementation works, but it could also include the
risk that the local InternalStreamsBuilder might not be the correct one
(just something one might want to keep in mind, since I will change it)

On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi Richard,
>
> Thanks a lot for the KIP!
>
> I have three question:
>  - why is the new merge() method static?
>  - why does the new merge() method take StreamsBuilder as a parameter?
>  - did you think about Xavier's comment (see the JIRA in case you did
> not notice it yet) about varargs vs adding some overloads to merge stream?
>
> My personal take is that merge() should not be static and not take
> StreamsBuilder. The idea of the JIRA was to get a more natural API:
>
> // old
> KStream merged = StreamsBuilder.merge(stream1, stream2);
> // new
> KStream merge = stream1.merge(stream2);
>
>
> Having pointed out the second pattern, it should actually be fine to get
> rid of varargs in merger() at all, as users could chain multiple calls
> to merge() after each other:
>
> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
>
>
>
>
> -Matthias
>
> On 9/16/17 9:36 PM, Richard Yu wrote:
> > Hi,
> > Please take a look at:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
> >
> > Thanks
> >
>
>

Reply via email to