FYI, I just saw this email chain and thought of sharing my exp. I used the
Storm Flink API few days ago. Just a simple example worked well, however I
will be testing few more next week.

One thing to note is, I had to include all Scala dependencies in the storm
topology since FlinkLocalCluster.java class has LocalFlinkMiniCluster.scala


Not sure if this is an issue but after including scala dependencies
everything worked well. ;)


On Fri, Nov 20, 2015 at 4:12 PM, Matthias J. Sax <mj...@apache.org> wrote:

> Multiple inputs per bolt is currently not supported. :(
> FlinkTopologyBuilder has a bug. There is already a JIRA for it:
> https://issues.apache.org/jira/browse/FLINK-2837
>
> I know already how to fix it (hope to can get it into 0.10.1)
>
> Removing FlinkTopologyBuilder does make sense (I did not do it because
> the members we need to access are private). Your idea to get access via
> reflection is good!
>
> Btw: can you also have a look here:
> https://github.com/apache/flink/pull/1387
> I would like to merge this ASAP but need some feedback.
>
> -Matthias
>
> On 11/20/2015 07:30 PM, Maximilian Michels wrote:
> > I thought about the API changes again. It probably does make sense to
> > keep the LocalCluster and StormSubmitter equivalent classes. That way,
> > we don't break the Storm API too much. Users can stick to the pattern
> > of using either FlinkCluster to execute locally or FlinkSubmitter to
> > submit remotely. Still, we can save some code by reusing Storm's
> > TopologyBuilder.
> >
> > I'll open a pull request with the changes. This also includes some
> > more examples and features (e.g. multiple inputs per Bolt).
> >
> > On Mon, Nov 16, 2015 at 4:33 PM, Maximilian Michels <m...@apache.org>
> wrote:
> >> You are right in saying that both API approaches support executing
> >> Storm jobs. However, I think the proposed changes make it much easier
> >> to reuse Storm topologies. And here is why:
> >>
> >> 1. No existing classes need to be exchanged.
> >>
> >> A Storm topology stays like it is. If you already have it defined
> >> somewhere, you simply pass it to the FlinkTopologyBuilder to create a
> >> StreamExecutionEnvironment.
> >>
> >> 2. Storm and Flink have different runtime behavior.
> >>
> >> IMHO makes more sense to make it transparent to the user that the
> >> result of the translation is an actual Flink job executed by the Flink
> >> runtime. Therefore, it makes sense to stick to the Flink way of
> >> executing. Hiding this fact behind Storm dummy classes can create
> >> problems for the user.
> >>
> >> 3. Code reuse
> >>
> >> As you can see in the proposed changes, it makes the implementation
> >> much simpler while retaining the desire functionality. That has also
> >> impact of testability and maintainability.
> >>
> >> I can also understand your perspective. I wonder if we could get some
> >> feedback from other people on the mailing list?
> >>
> >>
> >> Let me also address your other comments and suggestions:
> >>
> >>> * You changed examples to use finite-spouts -- from a testing point of
> >>> view this makes sense. However, the examples should show how to run an
> >>> *unmodified* Storm topology in Flink.
> >>
> >> Good point. As far as I know we only test finite sources in the Flink
> >> streaming tests. Using finite sources makes things much easier. I
> >> would like to keep the tests simple like this. We can still have
> >> separate tests to test the infinite attribute of the regular spouts.
> >> The examples can be converted back to using the infinite spout. IMHO
> >> the existing approach which involves waiting and killing of the
> >> topology doesn't seem to be the cleanest solution.
> >>
> >>> * we should keep the local copy "unprocessedBolts" when creating a
> Flink
> >>> program to allow to re-submit the same topology object twice (or alter
> >>> it after submission). If you don't make the copy,
> submitting/translating
> >>> the topology into a Flink job alters the object (which should not
> >>> happen). And as it is not performance critical, the copying overhead
> >>> does not matter.
> >>
> >> I didn't think about that but we can copy the spouts and bolts before
> >> processing them. I've added that to my local branch. However, I didn't
> >> see where this was done previously. Can you give me a hint?
> >>
> >>> * Why did you change the dop from 4 to 1 WordCountTopology ? We should
> >>> test in parallel fashion...
> >>
> >> Absolutely. Already reverted this locally.
> >>
> >>> * Too many reformatting changes ;) You though many classes without any
> >>> actual code changes.
> >>
> >> Yes, I ran "Optimize Imports" in IntelliJ. Sorry for that but this
> >> only affects the import statements.
> >>
> >> I would like to open a pull request soon to merge some of the changes.
> >> It would be great if some other people commented on the API changes
> >> and whether we should integrate direct support for spouts/bolts in
> >> DataStream. Next, I would like to test and bundle some more of the
> >> examples included in Storm.
> >>
> >> Cheers,
> >> Max
> >>
> >> On Sat, Nov 14, 2015 at 5:13 PM, Matthias J. Sax <mj...@apache.org>
> wrote:
> >>> I just had a look at your proposal. It makes a lot of sense. I still
> >>> believe that it is a matter of taste if one prefers your or my point of
> >>> view. Both approaches allows to easily reuse and execute Storm
> >>> Topologies on Flink (what is the most important feature we need to
> have).
> >>>
> >>> I hope to get some more feedback from the community, if the
> >>> Strom-compatibility should be more "stormy" or more "flinky". Bot
> >>> approaches make sense to me.
> >>>
> >>>
> >>> I view minor comments:
> >>>
> >>> * FileSpout vs FiniteFileSpout
> >>>   -> FileSpout was implemented in a Storm way -- to set the "finished"
> >>> flag here does not make sense from a Storm point of view (there is no
> >>> such thing as a finite spout)
> >>>   Thus, this example shows how a regular Storm spout can be improved
> >>> using FiniteSpout interface -- I would keep it as is (even if seems to
> >>> be unnecessary complicated -- imagine that you don't have the code of
> >>> FileSpout)
> >>>
> >>> * You changed examples to use finite-spouts -- from a testing point of
> >>> view this makes sense. However, the examples should show how to run an
> >>> *unmodified* Storm topology in Flink.
> >>>
> >>> * we should keep the local copy "unprocessedBolts" when creating a
> Flink
> >>> program to allow to re-submit the same topology object twice (or alter
> >>> it after submission). If you don't make the copy,
> submitting/translating
> >>> the topology into a Flink job alters the object (which should not
> >>> happen). And as it is not performance critical, the copying overhead
> >>> does not matter.
> >>>
> >>> * Why did you change the dop from 4 to 1 WordCountTopology ? We should
> >>> test in parallel fashion...
> >>>
> >>> * Too many reformatting changes ;) You though many classes without any
> >>> actual code changes.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> -------- Forwarded Message --------
> >>> Subject: Re: Storm Compatibility
> >>> Date: Fri, 13 Nov 2015 12:15:19 +0100
> >>> From: Maximilian Michels <m...@apache.org>
> >>> To: Matthias J. Sax <mj...@apache.org>
> >>> CC: Stephan Ewen <se...@apache.org>, Robert Metzger <
> rmetz...@apache.org>
> >>>
> >>> Hi Matthias,
> >>>
> >>> Thank you for your remarks.
> >>>
> >>> I believe the goal of the compatibility layer should not be to mimic
> >>> Storm's API but to easily execute Storm typologies using Flink. I see
> >>> that it is easy for users to use class names for execution they know
> >>> from Storm but I think this makes the API verbose. I've refactored it
> >>> a bit to make it more aligned with Flink's execution model. After all,
> >>> the most important thing is that it makes it easy for people to reuse
> >>> Storm typologies while getting all the advantages of Flink.
> >>>
> >>> Let me explain what I have done so far:
> >>> https://github.com/apache/flink/compare/master...mxm:storm-dev
> >>>
> >>> API
> >>> - remove FlinkClient, FlinkSubmitter, FlinkLocalCluster,
> >>> FlinkTopology: They are not necessary in my opinion and are
> >>> replicating functionality already included in Flink or Storm.
> >>>
> >>> - Build the topology with the Storm TopologyBuilder (instead of
> >>> FlinkTopology) which is then passed to the FlinkTopologyBuilder which
> >>> generates the StreamExecutionEnvironment containing the StreamGraph.
> >>> You can then simply call execute() like you would usually do in Flink.
> >>> This lets you reuse your Storm typologies with the ease of Flink
> >>> context-based execution mechanism. Note that it works in local and
> >>> remote execution mode without changing any code.
> >>>
> >>> Tests
> >>> - replaced StormTestBase.java with StreamingTestBase
> >>> - use a Finite source for the tests and changed it a bit
> >>>
> >>> Examples
> >>> - Convert examples to new API
> >>> - Remove duplicate examples (local and remote)
> >>>
> >>> I hope these changes are not too invasive for you. I think it makes
> >>> the compatibility layer much easier to use. Let me know what you think
> >>> about it. Of course, we can iterate on it.
> >>>
> >>> About the integration of the compatibility layer into DataStream:
> >>> Wouldn't it be possible to set storm to provided and let the user
> >>> include the jar if he/she wants to use the Storm compatibility? That's
> >>> also what we do for other libraries like Gelly. You have to package
> >>> them into the JAR if you want to run them on the cluster. We should
> >>> give a good error message if classes cannot be found.
> >>>
> >>> +1 for moving the discussion to the dev list.
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On Fri, Nov 13, 2015 at 7:41 AM, Matthias J. Sax <mj...@apache.org>
> wrote:
> >>>> One more thing that just came to my mind about (1): I have to correct
> my
> >>>> last reply on it:
> >>>>
> >>>> We **cannot reuse** TopologyBuilder because the returned StormTopology
> >>>> from .createTopology() does **not** contain the references to the
> >>>> Spout/Bolt object. Internally, those are already serialized into an
> >>>> internal Thrift representation (as preparation to get sent to Nimbus).
> >>>> However, in order to create a Flink job, we need the references of
> course...
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 11/11/2015 04:33 PM, Maximilian Michels wrote:
> >>>>> Hi Matthias,
> >>>>>
> >>>>> Sorry for getting back to you late. I'm very new to Storm but have
> >>>>> familiarized myself a bit the last days. While looking through the
> >>>>> Storm examples and the compatibility layer I discovered the following
> >>>>> issues:
> >>>>>
> >>>>> 1) The compatibility layer mirrors the Storm API instead of reusing
> >>>>> it. Why do we need a FlinkTopologyBuilder, FlinkCluster,
> >>>>> FlinkSubmitter, FlinkClient? Couldn't all these user-facing classes
> by
> >>>>> replaced by e.g. StormExecutionEnvironment which receives the Storm
> >>>>> topology and upon getStreamGraph() just traverses it?
> >>>>>
> >>>>> 2) DRPC is not yet supported. I don't know how crucial this is but it
> >>>>> seems to be widespread Storm feature. If we wrapped the entire Storm
> >>>>> topology, we could give appropriate errors when we see such
> >>>>> unsupported features.
> >>>>>
> >>>>> 3) We could simplify embedding Spouts and Bolts directly as operator
> >>>>> functions. Users shouldn't have to worry about extracting the types.
> >>>>> Perhaps we could implement a dedicated method to add spouts/bolts on
> >>>>> DataStream?
> >>>>>
> >>>>> 5) Performance: The BoltWrapper creates a StormTuple for every
> >>>>> incoming record. I think this could be improved. Couldn't we use the
> >>>>> StormTuple as data type instead of Flink's tuples?
> >>>>>
> >>>>> 6) Trident Examples. Have you run any?
> >>>>>
> >>>>> That's it for now. I'm sure you know about many more improvements or
> >>>>> problems because you're the expert on this. In the meantime, I'll try
> >>>>> to contact you via IRC.
> >>>>>
> >>>>> Cheers,
> >>>>> Max
> >>>>>
> >>>>> On Fri, Nov 6, 2015 at 6:25 PM, Matthias J. Sax <mj...@apache.org>
> wrote:
> >>>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> that sounds great! I am very happy that people are interested in it
> and
> >>>>>> start to use it! Can you give some more details about this? I am
> just
> >>>>>> aware of a few question at SO. But there was no question about it
> on the
> >>>>>> mailing list lately... Did you get some more internal
> questions/feedback?
> >>>>>>
> >>>>>> And of course, other people should get involved as well! There is so
> >>>>>> much too do -- even if I work 40h a week on it, I cannot get
> everything
> >>>>>> done by myself. The last days were very busy for me. I hope I can
> work
> >>>>>> on a couple of bugs after the Munich Meetup. I started to look into
> them
> >>>>>> already...
> >>>>>>
> >>>>>> Should we start a roadmap in the Wiki? This might be helpful if more
> >>>>>> people get involved.
> >>>>>>
> >>>>>> And thanks for keeping me in the loop :)
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 11/06/2015 03:49 PM, Stephan Ewen wrote:
> >>>>>>> Hi Matthias!
> >>>>>>>
> >>>>>>> We are seeing a lot of people getting very excited about the Storm
> >>>>>>> Compatibility layer. I expect that quite a few people will
> seriously
> >>>>>>> start to work with it.
> >>>>>>>
> >>>>>>> I would suggest that we also start getting involved in that. Since
> you
> >>>>>>> have of course your priority on your Ph.D., it would be a little
> much
> >>>>>>> asked from you to dedicate a lot of time to support more features,
> be
> >>>>>>> super responsive with users all the time, etc.
> >>>>>>>
> >>>>>>> To that end, some people from us will start testing the API, adding
> >>>>>>> fixes, etc (which also helps us to understand this better when
> users ask
> >>>>>>> questions).
> >>>>>>> We would definitely like for you to stay involved (we don't want to
> >>>>>>> hijack this), and help with ideas, especially when it comes to
> things
> >>>>>>> like fault tolerance design, etc.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Greetings,
> >>>>>>> Stephan
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>>
> >>>
> >>>
>
>

Reply via email to