FYI, I just saw this email chain and thought of sharing my exp. I used the Storm Flink API few days ago. Just a simple example worked well, however I will be testing few more next week.
One thing to note is, I had to include all Scala dependencies in the storm topology since FlinkLocalCluster.java class has LocalFlinkMiniCluster.scala Not sure if this is an issue but after including scala dependencies everything worked well. ;) On Fri, Nov 20, 2015 at 4:12 PM, Matthias J. Sax <mj...@apache.org> wrote: > Multiple inputs per bolt is currently not supported. :( > FlinkTopologyBuilder has a bug. There is already a JIRA for it: > https://issues.apache.org/jira/browse/FLINK-2837 > > I know already how to fix it (hope to can get it into 0.10.1) > > Removing FlinkTopologyBuilder does make sense (I did not do it because > the members we need to access are private). Your idea to get access via > reflection is good! > > Btw: can you also have a look here: > https://github.com/apache/flink/pull/1387 > I would like to merge this ASAP but need some feedback. > > -Matthias > > On 11/20/2015 07:30 PM, Maximilian Michels wrote: > > I thought about the API changes again. It probably does make sense to > > keep the LocalCluster and StormSubmitter equivalent classes. That way, > > we don't break the Storm API too much. Users can stick to the pattern > > of using either FlinkCluster to execute locally or FlinkSubmitter to > > submit remotely. Still, we can save some code by reusing Storm's > > TopologyBuilder. > > > > I'll open a pull request with the changes. This also includes some > > more examples and features (e.g. multiple inputs per Bolt). > > > > On Mon, Nov 16, 2015 at 4:33 PM, Maximilian Michels <m...@apache.org> > wrote: > >> You are right in saying that both API approaches support executing > >> Storm jobs. However, I think the proposed changes make it much easier > >> to reuse Storm topologies. And here is why: > >> > >> 1. No existing classes need to be exchanged. > >> > >> A Storm topology stays like it is. If you already have it defined > >> somewhere, you simply pass it to the FlinkTopologyBuilder to create a > >> StreamExecutionEnvironment. > >> > >> 2. Storm and Flink have different runtime behavior. > >> > >> IMHO makes more sense to make it transparent to the user that the > >> result of the translation is an actual Flink job executed by the Flink > >> runtime. Therefore, it makes sense to stick to the Flink way of > >> executing. Hiding this fact behind Storm dummy classes can create > >> problems for the user. > >> > >> 3. Code reuse > >> > >> As you can see in the proposed changes, it makes the implementation > >> much simpler while retaining the desire functionality. That has also > >> impact of testability and maintainability. > >> > >> I can also understand your perspective. I wonder if we could get some > >> feedback from other people on the mailing list? > >> > >> > >> Let me also address your other comments and suggestions: > >> > >>> * You changed examples to use finite-spouts -- from a testing point of > >>> view this makes sense. However, the examples should show how to run an > >>> *unmodified* Storm topology in Flink. > >> > >> Good point. As far as I know we only test finite sources in the Flink > >> streaming tests. Using finite sources makes things much easier. I > >> would like to keep the tests simple like this. We can still have > >> separate tests to test the infinite attribute of the regular spouts. > >> The examples can be converted back to using the infinite spout. IMHO > >> the existing approach which involves waiting and killing of the > >> topology doesn't seem to be the cleanest solution. > >> > >>> * we should keep the local copy "unprocessedBolts" when creating a > Flink > >>> program to allow to re-submit the same topology object twice (or alter > >>> it after submission). If you don't make the copy, > submitting/translating > >>> the topology into a Flink job alters the object (which should not > >>> happen). And as it is not performance critical, the copying overhead > >>> does not matter. > >> > >> I didn't think about that but we can copy the spouts and bolts before > >> processing them. I've added that to my local branch. However, I didn't > >> see where this was done previously. Can you give me a hint? > >> > >>> * Why did you change the dop from 4 to 1 WordCountTopology ? We should > >>> test in parallel fashion... > >> > >> Absolutely. Already reverted this locally. > >> > >>> * Too many reformatting changes ;) You though many classes without any > >>> actual code changes. > >> > >> Yes, I ran "Optimize Imports" in IntelliJ. Sorry for that but this > >> only affects the import statements. > >> > >> I would like to open a pull request soon to merge some of the changes. > >> It would be great if some other people commented on the API changes > >> and whether we should integrate direct support for spouts/bolts in > >> DataStream. Next, I would like to test and bundle some more of the > >> examples included in Storm. > >> > >> Cheers, > >> Max > >> > >> On Sat, Nov 14, 2015 at 5:13 PM, Matthias J. Sax <mj...@apache.org> > wrote: > >>> I just had a look at your proposal. It makes a lot of sense. I still > >>> believe that it is a matter of taste if one prefers your or my point of > >>> view. Both approaches allows to easily reuse and execute Storm > >>> Topologies on Flink (what is the most important feature we need to > have). > >>> > >>> I hope to get some more feedback from the community, if the > >>> Strom-compatibility should be more "stormy" or more "flinky". Bot > >>> approaches make sense to me. > >>> > >>> > >>> I view minor comments: > >>> > >>> * FileSpout vs FiniteFileSpout > >>> -> FileSpout was implemented in a Storm way -- to set the "finished" > >>> flag here does not make sense from a Storm point of view (there is no > >>> such thing as a finite spout) > >>> Thus, this example shows how a regular Storm spout can be improved > >>> using FiniteSpout interface -- I would keep it as is (even if seems to > >>> be unnecessary complicated -- imagine that you don't have the code of > >>> FileSpout) > >>> > >>> * You changed examples to use finite-spouts -- from a testing point of > >>> view this makes sense. However, the examples should show how to run an > >>> *unmodified* Storm topology in Flink. > >>> > >>> * we should keep the local copy "unprocessedBolts" when creating a > Flink > >>> program to allow to re-submit the same topology object twice (or alter > >>> it after submission). If you don't make the copy, > submitting/translating > >>> the topology into a Flink job alters the object (which should not > >>> happen). And as it is not performance critical, the copying overhead > >>> does not matter. > >>> > >>> * Why did you change the dop from 4 to 1 WordCountTopology ? We should > >>> test in parallel fashion... > >>> > >>> * Too many reformatting changes ;) You though many classes without any > >>> actual code changes. > >>> > >>> > >>> > >>> > >>> > >>> > >>> -------- Forwarded Message -------- > >>> Subject: Re: Storm Compatibility > >>> Date: Fri, 13 Nov 2015 12:15:19 +0100 > >>> From: Maximilian Michels <m...@apache.org> > >>> To: Matthias J. Sax <mj...@apache.org> > >>> CC: Stephan Ewen <se...@apache.org>, Robert Metzger < > rmetz...@apache.org> > >>> > >>> Hi Matthias, > >>> > >>> Thank you for your remarks. > >>> > >>> I believe the goal of the compatibility layer should not be to mimic > >>> Storm's API but to easily execute Storm typologies using Flink. I see > >>> that it is easy for users to use class names for execution they know > >>> from Storm but I think this makes the API verbose. I've refactored it > >>> a bit to make it more aligned with Flink's execution model. After all, > >>> the most important thing is that it makes it easy for people to reuse > >>> Storm typologies while getting all the advantages of Flink. > >>> > >>> Let me explain what I have done so far: > >>> https://github.com/apache/flink/compare/master...mxm:storm-dev > >>> > >>> API > >>> - remove FlinkClient, FlinkSubmitter, FlinkLocalCluster, > >>> FlinkTopology: They are not necessary in my opinion and are > >>> replicating functionality already included in Flink or Storm. > >>> > >>> - Build the topology with the Storm TopologyBuilder (instead of > >>> FlinkTopology) which is then passed to the FlinkTopologyBuilder which > >>> generates the StreamExecutionEnvironment containing the StreamGraph. > >>> You can then simply call execute() like you would usually do in Flink. > >>> This lets you reuse your Storm typologies with the ease of Flink > >>> context-based execution mechanism. Note that it works in local and > >>> remote execution mode without changing any code. > >>> > >>> Tests > >>> - replaced StormTestBase.java with StreamingTestBase > >>> - use a Finite source for the tests and changed it a bit > >>> > >>> Examples > >>> - Convert examples to new API > >>> - Remove duplicate examples (local and remote) > >>> > >>> I hope these changes are not too invasive for you. I think it makes > >>> the compatibility layer much easier to use. Let me know what you think > >>> about it. Of course, we can iterate on it. > >>> > >>> About the integration of the compatibility layer into DataStream: > >>> Wouldn't it be possible to set storm to provided and let the user > >>> include the jar if he/she wants to use the Storm compatibility? That's > >>> also what we do for other libraries like Gelly. You have to package > >>> them into the JAR if you want to run them on the cluster. We should > >>> give a good error message if classes cannot be found. > >>> > >>> +1 for moving the discussion to the dev list. > >>> > >>> Cheers, > >>> Max > >>> > >>> On Fri, Nov 13, 2015 at 7:41 AM, Matthias J. Sax <mj...@apache.org> > wrote: > >>>> One more thing that just came to my mind about (1): I have to correct > my > >>>> last reply on it: > >>>> > >>>> We **cannot reuse** TopologyBuilder because the returned StormTopology > >>>> from .createTopology() does **not** contain the references to the > >>>> Spout/Bolt object. Internally, those are already serialized into an > >>>> internal Thrift representation (as preparation to get sent to Nimbus). > >>>> However, in order to create a Flink job, we need the references of > course... > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 11/11/2015 04:33 PM, Maximilian Michels wrote: > >>>>> Hi Matthias, > >>>>> > >>>>> Sorry for getting back to you late. I'm very new to Storm but have > >>>>> familiarized myself a bit the last days. While looking through the > >>>>> Storm examples and the compatibility layer I discovered the following > >>>>> issues: > >>>>> > >>>>> 1) The compatibility layer mirrors the Storm API instead of reusing > >>>>> it. Why do we need a FlinkTopologyBuilder, FlinkCluster, > >>>>> FlinkSubmitter, FlinkClient? Couldn't all these user-facing classes > by > >>>>> replaced by e.g. StormExecutionEnvironment which receives the Storm > >>>>> topology and upon getStreamGraph() just traverses it? > >>>>> > >>>>> 2) DRPC is not yet supported. I don't know how crucial this is but it > >>>>> seems to be widespread Storm feature. If we wrapped the entire Storm > >>>>> topology, we could give appropriate errors when we see such > >>>>> unsupported features. > >>>>> > >>>>> 3) We could simplify embedding Spouts and Bolts directly as operator > >>>>> functions. Users shouldn't have to worry about extracting the types. > >>>>> Perhaps we could implement a dedicated method to add spouts/bolts on > >>>>> DataStream? > >>>>> > >>>>> 5) Performance: The BoltWrapper creates a StormTuple for every > >>>>> incoming record. I think this could be improved. Couldn't we use the > >>>>> StormTuple as data type instead of Flink's tuples? > >>>>> > >>>>> 6) Trident Examples. Have you run any? > >>>>> > >>>>> That's it for now. I'm sure you know about many more improvements or > >>>>> problems because you're the expert on this. In the meantime, I'll try > >>>>> to contact you via IRC. > >>>>> > >>>>> Cheers, > >>>>> Max > >>>>> > >>>>> On Fri, Nov 6, 2015 at 6:25 PM, Matthias J. Sax <mj...@apache.org> > wrote: > >>>>>> > >>>>>> Hi, > >>>>>> > >>>>>> that sounds great! I am very happy that people are interested in it > and > >>>>>> start to use it! Can you give some more details about this? I am > just > >>>>>> aware of a few question at SO. But there was no question about it > on the > >>>>>> mailing list lately... Did you get some more internal > questions/feedback? > >>>>>> > >>>>>> And of course, other people should get involved as well! There is so > >>>>>> much too do -- even if I work 40h a week on it, I cannot get > everything > >>>>>> done by myself. The last days were very busy for me. I hope I can > work > >>>>>> on a couple of bugs after the Munich Meetup. I started to look into > them > >>>>>> already... > >>>>>> > >>>>>> Should we start a roadmap in the Wiki? This might be helpful if more > >>>>>> people get involved. > >>>>>> > >>>>>> And thanks for keeping me in the loop :) > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> On 11/06/2015 03:49 PM, Stephan Ewen wrote: > >>>>>>> Hi Matthias! > >>>>>>> > >>>>>>> We are seeing a lot of people getting very excited about the Storm > >>>>>>> Compatibility layer. I expect that quite a few people will > seriously > >>>>>>> start to work with it. > >>>>>>> > >>>>>>> I would suggest that we also start getting involved in that. Since > you > >>>>>>> have of course your priority on your Ph.D., it would be a little > much > >>>>>>> asked from you to dedicate a lot of time to support more features, > be > >>>>>>> super responsive with users all the time, etc. > >>>>>>> > >>>>>>> To that end, some people from us will start testing the API, adding > >>>>>>> fixes, etc (which also helps us to understand this better when > users ask > >>>>>>> questions). > >>>>>>> We would definitely like for you to stay involved (we don't want to > >>>>>>> hijack this), and help with ideas, especially when it comes to > things > >>>>>>> like fault tolerance design, etc. > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> Greetings, > >>>>>>> Stephan > >>>>>>> > >>>>>> > >>>> > >>> > >>> > >>> > >>> > >