Also , I think there can be some instances of spouts/bolts running on JVM 1 and some on JVM 2 and so on...
Is it possible for spouts and bolts running on same jvm to access same applicationContext . I am thinking that I can make the place where I launch my spring Boot application inside a singleton class , and so all the spouts and bolts running on say JVM1 will have access to same context (instead of launching it in all spouts and bolts) . And for those in JVM 2 they will still initialise it once and all the rest will get the same application Context . But all above is theoretical assumption . I still need to try it out (unfortunately i dont have a cluster setup at my end) but if possible please let me know if this can work . Thanks Ankur On Sun, Oct 11, 2015 at 11:48 AM, Ankur Garg <[email protected]> wrote: > Thanks for replying Ravi . > > I think your suggestion to make wrapper to read json or xml is a very nice > Idea indeed . > > But , the problem for me here is to have the context (with all beans > loaded and initialized ) available inside the Spouts and Bolts and that > means inside every running instance of Spouts and Bolts which may be > running on different machines and different jvm. > > Agree that when defining topology I dont need Spring Context as I just > have to define spouts and bolts there. I used context here to send them to > spout and bolt through constructor but it appears from comments above that > it wont work on distributed cluster . > > So , is there some way that once topology gets submitted to run in a > distributed cluster , I can initialize my context there and someway they > are available to all Spouts and Bolts ..Basically some shared location > where my application Context can be initialized (once and only once) and > this context can be accessed by > all instances of Spouts and Bolts ? > > Thanks > > On Sun, Oct 11, 2015 at 11:20 AM, Ravi Sharma <[email protected]> wrote: > >> Basically u will have two context defined at different time/phase >> >> When u r about to submit the topology, u need to build topology, that >> context only need information about spouts and bolts. You don't need any >> application bean like database accessories or ur services etc, as at this >> level u r not running ur application but u r just creating a topology and >> defining how bolts and spouts are connected to each other etc etc >> >> Now once topology is submitted, topology will be moved to one of the >> supervisor node and will start running, all spouts and bolts will be >> initialized, at this moment u will need ur application context, which >> doesn't need ur earlier topology context >> >> So I will suggest keep both context separate. >> >> Topology is not complex to build, smaller topology can be built via code >> only, I. E. Which bolt listening to which spout, but if u want to go with >> good design, I say just write a small wrapper to read some json where u can >> define ur bolts and spouts and use that to build topology (u can use spring >> but it's not much needed) >> >> In past I have done it using both json setting (without spring) and xml >> setting (with spring) both works good >> >> Ravi >> On 11 Oct 2015 06:38, "Ankur Garg" <[email protected]> wrote: >> >>> Oh The problem here is I have many beans and which need to be >>> initialized (some are reading conf from yml files , database connection , >>> thread pool initialization etc) . >>> >>> >>> Now , I have written a spring boot application which takes care of all >>> the above and I define my topology inside one of the beans , Here is my >>> bean >>> >>> @Autowired >>> ApplicationContext appContext; >>> >>> @Bean >>> public void submitTopology() throws >>> AlreadyAliveException,InvalidTopologyException { >>> >>> TopologyBuilder builder = new TopologyBuilder(); >>> >>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext), >>> 10); >>> >>> builder.setBolt("mapBolt", new GroupingBolt(appContext), >>> 10).shuffleGrouping("rabbitMqSpout"); >>> >>> builder.setBolt("reduceBolt", new PublishingBolt(appContext), >>> 10).shuffleGrouping("mapBolt"); >>> >>> Config conf = new Config(); >>> >>> conf.registerSerialization(EventBean.class); // To be registered with >>> Kyro for Storm >>> >>> conf.registerSerialization(InputQueueManagerImpl.class); >>> >>> conf.setDebug(true); >>> >>> conf.setMessageTimeoutSecs(200); >>> >>> LocalCluster cluster = new LocalCluster(); >>> >>> cluster.submitTopology("test", conf, builder.createTopology()); >>> >>> } >>> >>> >>> When this bean is initialized , I already have appContext initialized by >>> my Spring Boot Application . So , the thing is , I am using SpringBoot to >>> initialize and load my context with all beans . >>> >>> Now this is the context which I want to leverage in my spouts and bolts >>> . >>> >>> So , if what I suggested earlier does not work on Storm Distributed >>> Cluster , I need to find a way of initializing my AppContext somehow:( >>> >>> I would be really thankful if anyone here can help me :( >>> >>> >>> Thanks >>> >>> Ankur >>> >>> On Sun, Oct 11, 2015 at 5:54 AM, Javier Gonzalez <[email protected]> >>> wrote: >>> >>>> The local cluster runs completely within a single JVM AFAIK. The local >>>> cluster is useful for development, testing your topology, etc. The real >>>> deployment has to go through nimbus, run on workers started by supervisors >>>> on one or more nodes, etc. Kind of difficult to simulate all that on a >>>> single box. >>>> >>>> On Sat, Oct 10, 2015 at 1:45 PM, Ankur Garg <[email protected]> >>>> wrote: >>>> >>>>> Oh ...So I will have to test it in a cluster. >>>>> >>>>> Having said that, how is local cluster which we use is too different >>>>> from normal cluster.. Ideally ,it shud simulate normal cluster.. >>>>> On Oct 10, 2015 7:51 PM, "Ravi Sharma" <[email protected]> wrote: >>>>> >>>>>> Hi Ankur, >>>>>> local it may be working but It wont work in Actual cluster. >>>>>> >>>>>> Think about SpringContext is collection of your so many resoucres, >>>>>> like Database connections , may be HTTP connections , Thread pools etc. >>>>>> These things wont get serialised and just go to other machines and >>>>>> start working. >>>>>> >>>>>> SO basically in init methods of bolt and spout, you need to call some >>>>>> singloton class like this >>>>>> >>>>>> ApplicationContext ac = SingletonApplicationContext.getContext(); >>>>>> >>>>>> SingletonApplicationContext will have a static variable >>>>>> ApplicationContext and in getContext you will check if static variable >>>>>> has >>>>>> been initialised if not then u will initilize it, and then return >>>>>> it(normal >>>>>> Singleton class) >>>>>> >>>>>> >>>>>> Now when Topolgy will move to any other node, Bolt and spouts will >>>>>> start and first init call will initialize it and other bolt/spouts will >>>>>> just use that. >>>>>> >>>>>> As John mentioned, its very important to mark all Spring beans and >>>>>> Context as transient. >>>>>> >>>>>> Hope it helps. >>>>>> >>>>>> Ravi. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Oct 10, 2015 at 6:25 AM, Ankur Garg <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Javier , >>>>>>> >>>>>>> So , I am using a Local cluster on my dev machine where I am using >>>>>>> Eclipse . Here , I am passing Springs ApplicationContext as constructor >>>>>>> argument to spouts and bolts . >>>>>>> >>>>>>> TopologyBuilder builder = new TopologyBuilder(); >>>>>>> >>>>>>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext), >>>>>>> 10); >>>>>>> >>>>>>> builder.setBolt("mapBolt", new GroupingBolt(appContext), >>>>>>> 10).shuffleGrouping("rabbitMqSpout"); >>>>>>> >>>>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext), >>>>>>> 10).shuffleGrouping("mapBolt"); >>>>>>> >>>>>>> Config conf = new Config(); >>>>>>> >>>>>>> conf.registerSerialization(EventBean.class); / >>>>>>> >>>>>>> conf.registerSerialization(InputQueueManagerImpl.class); >>>>>>> >>>>>>> conf.setDebug(true); >>>>>>> >>>>>>> LocalCluster cluster = new LocalCluster(); >>>>>>> >>>>>>> cluster.submitTopology("test", conf, builder.createTopology()); >>>>>>> >>>>>>> >>>>>>> And in my spouts and Bolts , >>>>>>> >>>>>>> I make my Application Context variable as static . So when it is >>>>>>> launched by c;uster.submitTopology , my context is still avalilable >>>>>>> >>>>>>> >>>>>>> private static ApplicationContext ctx; >>>>>>> >>>>>>> public RabbitListnerSpout(ApplicationContext appContext) { >>>>>>> >>>>>>> LOG.info("RabbitListner Constructor called"); >>>>>>> >>>>>>> ctx = appContext; >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> @SuppressWarnings("rawtypes") >>>>>>> >>>>>>> @Override >>>>>>> >>>>>>> public void open(Map conf, TopologyContext context,SpoutOutputCollector >>>>>>> collector) { >>>>>>> >>>>>>> LOG.info("Inside the open Method for RabbitListner Spout"); >>>>>>> >>>>>>> inputManager = (InputQueueManagerImpl) ctx >>>>>>> .getBean(InputQueueManagerImpl.class); >>>>>>> >>>>>>> notificationManager = (NotificationQueueManagerImpl) ctx >>>>>>> .getBean(NotificationQueueManagerImpl.class); >>>>>>> >>>>>>> eventExchange = ctx.getEnvironment().getProperty( >>>>>>> "input.rabbitmq.events.exchange"); >>>>>>> >>>>>>> routingKey = ctx.getEnvironment().getProperty( >>>>>>> "input.rabbitmq.events.routingKey"); >>>>>>> >>>>>>> eventQueue = ctx.getEnvironment().getProperty( >>>>>>> "input.rabbitmq.events.queue"); >>>>>>> >>>>>>> _collector = collector; >>>>>>> >>>>>>> LOG.info("Exiting the open Method for RabbitListner Spout"); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> This is working like a charm (my ApplicationContext is initialized >>>>>>> seperately ) . As we all know , ApplicationContext is not serializable . >>>>>>> But this works well in LocalCluster. >>>>>>> >>>>>>> My assumption is that it will work in a seperate Cluster too . Is my >>>>>>> assumption correct ?? >>>>>>> >>>>>>> On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> IIRC, only if everything you use in your spouts and bolts is >>>>>>>> serializable. >>>>>>>> On Oct 6, 2015 11:29 PM, "Ankur Garg" <[email protected]> wrote: >>>>>>>> >>>>>>>>> Hi Ravi , >>>>>>>>> >>>>>>>>> I was able to make an Integration with Spring but the problem is >>>>>>>>> that I have to autowire for every bolt and spout . That means that >>>>>>>>> even if >>>>>>>>> i parallelize spout and bolt it will get started to each instance . >>>>>>>>> Is >>>>>>>>> there some way that I only have to do for bolts and spouts once (I >>>>>>>>> mean if >>>>>>>>> I parallelize bolts or spouts individually it can share the conf from >>>>>>>>> somewhere) . IS this possible?? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Ankur >>>>>>>>> >>>>>>>>> On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Yes this is for annotation also... >>>>>>>>>> >>>>>>>>>> you can call this method in prepare() method of bolt and >>>>>>>>>> onOpen() method >>>>>>>>>> in every Spout and make sure you don't use any autowire bean >>>>>>>>>> before this >>>>>>>>>> call. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Ravi. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Sep 29, 2015 at 2:22 PM, Ankur Garg <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> > Hi Ravi , >>>>>>>>>> > >>>>>>>>>> > Thanks for your reply . I am using annotation based >>>>>>>>>> configuration and using >>>>>>>>>> > Spring Boot. >>>>>>>>>> > >>>>>>>>>> > Any idea how to do it using annotations ? >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > On Tue, Sep 29, 2015 at 6:41 PM, Ravi Sharma < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> > >>>>>>>>>> > > Bolts and Spouts are created by Storm and not known to Spring >>>>>>>>>> Context. >>>>>>>>>> > You >>>>>>>>>> > > need to manually add them to SpringContext, there are few >>>>>>>>>> methods >>>>>>>>>> > available >>>>>>>>>> > > i.e. >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > >>>>>>>>>> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this, >>>>>>>>>> > > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT, false); >>>>>>>>>> > > >>>>>>>>>> > > SpringContext is my own class where i have injected >>>>>>>>>> SpringContext so >>>>>>>>>> > > SpringContext.getContext() returns the actuall Spring Context >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > Ravi. >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > On Tue, Sep 29, 2015 at 1:03 PM, Ankur Garg < >>>>>>>>>> [email protected]> >>>>>>>>>> > wrote: >>>>>>>>>> > > >>>>>>>>>> > > > Hi , >>>>>>>>>> > > > >>>>>>>>>> > > > I am building a Storm topology with set of Spouts and >>>>>>>>>> Bolts and also >>>>>>>>>> > > using >>>>>>>>>> > > > Spring for Dependency Injection . >>>>>>>>>> > > > >>>>>>>>>> > > > Unfortunately , none of my fields are getting autowired >>>>>>>>>> even though I >>>>>>>>>> > > have >>>>>>>>>> > > > declared all my spouts and Bolts as @Components . >>>>>>>>>> > > > >>>>>>>>>> > > > However the place where I am declaring my topology , Spring >>>>>>>>>> is working >>>>>>>>>> > > fine >>>>>>>>>> > > > . >>>>>>>>>> > > > >>>>>>>>>> > > > Is it because cluster.submitTopology("test", conf, >>>>>>>>>> > > > builder.createTopology()) >>>>>>>>>> > > > submits the topology to a cluster (locally it spawns >>>>>>>>>> different thread >>>>>>>>>> > > for >>>>>>>>>> > > > Spouts and Bolts) that Autowiring is not working? >>>>>>>>>> > > > >>>>>>>>>> > > > Please suggest . >>>>>>>>>> > > > >>>>>>>>>> > > >>>>>>>>>> > >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >>>> >>>> -- >>>> Javier González Nicolini >>>> >>> >>> >
