Multiple inputs per bolt is currently not supported. :(
FlinkTopologyBuilder has a bug. There is already a JIRA for it:
https://issues.apache.org/jira/browse/FLINK-2837

I know already how to fix it (hope to can get it into 0.10.1)

Removing FlinkTopologyBuilder does make sense (I did not do it because
the members we need to access are private). Your idea to get access via
reflection is good!

Btw: can you also have a look here:
https://github.com/apache/flink/pull/1387
I would like to merge this ASAP but need some feedback.

-Matthias

On 11/20/2015 07:30 PM, Maximilian Michels wrote:
> I thought about the API changes again. It probably does make sense to
> keep the LocalCluster and StormSubmitter equivalent classes. That way,
> we don't break the Storm API too much. Users can stick to the pattern
> of using either FlinkCluster to execute locally or FlinkSubmitter to
> submit remotely. Still, we can save some code by reusing Storm's
> TopologyBuilder.
> 
> I'll open a pull request with the changes. This also includes some
> more examples and features (e.g. multiple inputs per Bolt).
> 
> On Mon, Nov 16, 2015 at 4:33 PM, Maximilian Michels <m...@apache.org> wrote:
>> You are right in saying that both API approaches support executing
>> Storm jobs. However, I think the proposed changes make it much easier
>> to reuse Storm topologies. And here is why:
>>
>> 1. No existing classes need to be exchanged.
>>
>> A Storm topology stays like it is. If you already have it defined
>> somewhere, you simply pass it to the FlinkTopologyBuilder to create a
>> StreamExecutionEnvironment.
>>
>> 2. Storm and Flink have different runtime behavior.
>>
>> IMHO makes more sense to make it transparent to the user that the
>> result of the translation is an actual Flink job executed by the Flink
>> runtime. Therefore, it makes sense to stick to the Flink way of
>> executing. Hiding this fact behind Storm dummy classes can create
>> problems for the user.
>>
>> 3. Code reuse
>>
>> As you can see in the proposed changes, it makes the implementation
>> much simpler while retaining the desire functionality. That has also
>> impact of testability and maintainability.
>>
>> I can also understand your perspective. I wonder if we could get some
>> feedback from other people on the mailing list?
>>
>>
>> Let me also address your other comments and suggestions:
>>
>>> * You changed examples to use finite-spouts -- from a testing point of
>>> view this makes sense. However, the examples should show how to run an
>>> *unmodified* Storm topology in Flink.
>>
>> Good point. As far as I know we only test finite sources in the Flink
>> streaming tests. Using finite sources makes things much easier. I
>> would like to keep the tests simple like this. We can still have
>> separate tests to test the infinite attribute of the regular spouts.
>> The examples can be converted back to using the infinite spout. IMHO
>> the existing approach which involves waiting and killing of the
>> topology doesn't seem to be the cleanest solution.
>>
>>> * we should keep the local copy "unprocessedBolts" when creating a Flink
>>> program to allow to re-submit the same topology object twice (or alter
>>> it after submission). If you don't make the copy, submitting/translating
>>> the topology into a Flink job alters the object (which should not
>>> happen). And as it is not performance critical, the copying overhead
>>> does not matter.
>>
>> I didn't think about that but we can copy the spouts and bolts before
>> processing them. I've added that to my local branch. However, I didn't
>> see where this was done previously. Can you give me a hint?
>>
>>> * Why did you change the dop from 4 to 1 WordCountTopology ? We should
>>> test in parallel fashion...
>>
>> Absolutely. Already reverted this locally.
>>
>>> * Too many reformatting changes ;) You though many classes without any
>>> actual code changes.
>>
>> Yes, I ran "Optimize Imports" in IntelliJ. Sorry for that but this
>> only affects the import statements.
>>
>> I would like to open a pull request soon to merge some of the changes.
>> It would be great if some other people commented on the API changes
>> and whether we should integrate direct support for spouts/bolts in
>> DataStream. Next, I would like to test and bundle some more of the
>> examples included in Storm.
>>
>> Cheers,
>> Max
>>
>> On Sat, Nov 14, 2015 at 5:13 PM, Matthias J. Sax <mj...@apache.org> wrote:
>>> I just had a look at your proposal. It makes a lot of sense. I still
>>> believe that it is a matter of taste if one prefers your or my point of
>>> view. Both approaches allows to easily reuse and execute Storm
>>> Topologies on Flink (what is the most important feature we need to have).
>>>
>>> I hope to get some more feedback from the community, if the
>>> Strom-compatibility should be more "stormy" or more "flinky". Bot
>>> approaches make sense to me.
>>>
>>>
>>> I view minor comments:
>>>
>>> * FileSpout vs FiniteFileSpout
>>>   -> FileSpout was implemented in a Storm way -- to set the "finished"
>>> flag here does not make sense from a Storm point of view (there is no
>>> such thing as a finite spout)
>>>   Thus, this example shows how a regular Storm spout can be improved
>>> using FiniteSpout interface -- I would keep it as is (even if seems to
>>> be unnecessary complicated -- imagine that you don't have the code of
>>> FileSpout)
>>>
>>> * You changed examples to use finite-spouts -- from a testing point of
>>> view this makes sense. However, the examples should show how to run an
>>> *unmodified* Storm topology in Flink.
>>>
>>> * we should keep the local copy "unprocessedBolts" when creating a Flink
>>> program to allow to re-submit the same topology object twice (or alter
>>> it after submission). If you don't make the copy, submitting/translating
>>> the topology into a Flink job alters the object (which should not
>>> happen). And as it is not performance critical, the copying overhead
>>> does not matter.
>>>
>>> * Why did you change the dop from 4 to 1 WordCountTopology ? We should
>>> test in parallel fashion...
>>>
>>> * Too many reformatting changes ;) You though many classes without any
>>> actual code changes.
>>>
>>>
>>>
>>>
>>>
>>>
>>> -------- Forwarded Message --------
>>> Subject: Re: Storm Compatibility
>>> Date: Fri, 13 Nov 2015 12:15:19 +0100
>>> From: Maximilian Michels <m...@apache.org>
>>> To: Matthias J. Sax <mj...@apache.org>
>>> CC: Stephan Ewen <se...@apache.org>, Robert Metzger <rmetz...@apache.org>
>>>
>>> Hi Matthias,
>>>
>>> Thank you for your remarks.
>>>
>>> I believe the goal of the compatibility layer should not be to mimic
>>> Storm's API but to easily execute Storm typologies using Flink. I see
>>> that it is easy for users to use class names for execution they know
>>> from Storm but I think this makes the API verbose. I've refactored it
>>> a bit to make it more aligned with Flink's execution model. After all,
>>> the most important thing is that it makes it easy for people to reuse
>>> Storm typologies while getting all the advantages of Flink.
>>>
>>> Let me explain what I have done so far:
>>> https://github.com/apache/flink/compare/master...mxm:storm-dev
>>>
>>> API
>>> - remove FlinkClient, FlinkSubmitter, FlinkLocalCluster,
>>> FlinkTopology: They are not necessary in my opinion and are
>>> replicating functionality already included in Flink or Storm.
>>>
>>> - Build the topology with the Storm TopologyBuilder (instead of
>>> FlinkTopology) which is then passed to the FlinkTopologyBuilder which
>>> generates the StreamExecutionEnvironment containing the StreamGraph.
>>> You can then simply call execute() like you would usually do in Flink.
>>> This lets you reuse your Storm typologies with the ease of Flink
>>> context-based execution mechanism. Note that it works in local and
>>> remote execution mode without changing any code.
>>>
>>> Tests
>>> - replaced StormTestBase.java with StreamingTestBase
>>> - use a Finite source for the tests and changed it a bit
>>>
>>> Examples
>>> - Convert examples to new API
>>> - Remove duplicate examples (local and remote)
>>>
>>> I hope these changes are not too invasive for you. I think it makes
>>> the compatibility layer much easier to use. Let me know what you think
>>> about it. Of course, we can iterate on it.
>>>
>>> About the integration of the compatibility layer into DataStream:
>>> Wouldn't it be possible to set storm to provided and let the user
>>> include the jar if he/she wants to use the Storm compatibility? That's
>>> also what we do for other libraries like Gelly. You have to package
>>> them into the JAR if you want to run them on the cluster. We should
>>> give a good error message if classes cannot be found.
>>>
>>> +1 for moving the discussion to the dev list.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Fri, Nov 13, 2015 at 7:41 AM, Matthias J. Sax <mj...@apache.org> wrote:
>>>> One more thing that just came to my mind about (1): I have to correct my
>>>> last reply on it:
>>>>
>>>> We **cannot reuse** TopologyBuilder because the returned StormTopology
>>>> from .createTopology() does **not** contain the references to the
>>>> Spout/Bolt object. Internally, those are already serialized into an
>>>> internal Thrift representation (as preparation to get sent to Nimbus).
>>>> However, in order to create a Flink job, we need the references of 
>>>> course...
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 11/11/2015 04:33 PM, Maximilian Michels wrote:
>>>>> Hi Matthias,
>>>>>
>>>>> Sorry for getting back to you late. I'm very new to Storm but have
>>>>> familiarized myself a bit the last days. While looking through the
>>>>> Storm examples and the compatibility layer I discovered the following
>>>>> issues:
>>>>>
>>>>> 1) The compatibility layer mirrors the Storm API instead of reusing
>>>>> it. Why do we need a FlinkTopologyBuilder, FlinkCluster,
>>>>> FlinkSubmitter, FlinkClient? Couldn't all these user-facing classes by
>>>>> replaced by e.g. StormExecutionEnvironment which receives the Storm
>>>>> topology and upon getStreamGraph() just traverses it?
>>>>>
>>>>> 2) DRPC is not yet supported. I don't know how crucial this is but it
>>>>> seems to be widespread Storm feature. If we wrapped the entire Storm
>>>>> topology, we could give appropriate errors when we see such
>>>>> unsupported features.
>>>>>
>>>>> 3) We could simplify embedding Spouts and Bolts directly as operator
>>>>> functions. Users shouldn't have to worry about extracting the types.
>>>>> Perhaps we could implement a dedicated method to add spouts/bolts on
>>>>> DataStream?
>>>>>
>>>>> 5) Performance: The BoltWrapper creates a StormTuple for every
>>>>> incoming record. I think this could be improved. Couldn't we use the
>>>>> StormTuple as data type instead of Flink's tuples?
>>>>>
>>>>> 6) Trident Examples. Have you run any?
>>>>>
>>>>> That's it for now. I'm sure you know about many more improvements or
>>>>> problems because you're the expert on this. In the meantime, I'll try
>>>>> to contact you via IRC.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Fri, Nov 6, 2015 at 6:25 PM, Matthias J. Sax <mj...@apache.org> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> that sounds great! I am very happy that people are interested in it and
>>>>>> start to use it! Can you give some more details about this? I am just
>>>>>> aware of a few question at SO. But there was no question about it on the
>>>>>> mailing list lately... Did you get some more internal questions/feedback?
>>>>>>
>>>>>> And of course, other people should get involved as well! There is so
>>>>>> much too do -- even if I work 40h a week on it, I cannot get everything
>>>>>> done by myself. The last days were very busy for me. I hope I can work
>>>>>> on a couple of bugs after the Munich Meetup. I started to look into them
>>>>>> already...
>>>>>>
>>>>>> Should we start a roadmap in the Wiki? This might be helpful if more
>>>>>> people get involved.
>>>>>>
>>>>>> And thanks for keeping me in the loop :)
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 11/06/2015 03:49 PM, Stephan Ewen wrote:
>>>>>>> Hi Matthias!
>>>>>>>
>>>>>>> We are seeing a lot of people getting very excited about the Storm
>>>>>>> Compatibility layer. I expect that quite a few people will seriously
>>>>>>> start to work with it.
>>>>>>>
>>>>>>> I would suggest that we also start getting involved in that. Since you
>>>>>>> have of course your priority on your Ph.D., it would be a little much
>>>>>>> asked from you to dedicate a lot of time to support more features, be
>>>>>>> super responsive with users all the time, etc.
>>>>>>>
>>>>>>> To that end, some people from us will start testing the API, adding
>>>>>>> fixes, etc (which also helps us to understand this better when users ask
>>>>>>> questions).
>>>>>>> We would definitely like for you to stay involved (we don't want to
>>>>>>> hijack this), and help with ideas, especially when it comes to things
>>>>>>> like fault tolerance design, etc.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>
>>>>
>>>
>>>
>>>
>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to