I think I don't need to Autowire beans inside my spout and bolts . All I want my context to be available . Since I use Spring Boot , I am delegating it to initialise all the beans and set up every bean (reading yml file and create DB connections , connections to Message brokers etc ) .
On my local cluster I am passing it as a constructor argument to Spouts and Bolts . Since all r running in same jvm its available to all spouts and bolts . But in a distributed cluster , this will blow up as Context is not serializable and cannot be passed like above . So the problem is only to make this context available once per jvm . Hence I thought I will wrap it under a singleton and make this available to all spouts and bolts per jvm. Once I have this context initialized and loaded all I need to do is to get the bean which I will do the same way I am doing inside local cluster spouts and bolts . On Sun, Oct 11, 2015 at 12:46 PM, Ravi Sharma <[email protected]> wrote: > Yes ur assumption is right > Jvm1 will create application contexts say ac1 > > And jvm2 will create another application instance ac2 > > And all of it can be done via singleton classes. > > All bolts and spouts in same jvm instance need to access same application > context. > > I have done same in cluster and it works > > Remember all spring beans need to be transient and also u need to set > required=false in case u r going create spout and bolt using spring > > Public class mybolt { > @aurowired(required=false) > Private transient MyServiceBean myServiceBean; > > .... > ... > } > > Ravi > On 11 Oct 2015 07:59, "Ankur Garg" <[email protected]> wrote: > >> Also , I think there can be some instances of spouts/bolts running on JVM >> 1 and some on JVM 2 and so on... >> >> Is it possible for spouts and bolts running on same jvm to access same >> applicationContext . >> >> I am thinking that I can make the place where I launch my spring Boot >> application inside a singleton class , and so all the spouts and bolts >> running on say JVM1 will have access to same context (instead of launching >> it in all spouts and bolts) . And for those in JVM 2 they will still >> initialise it once and all the rest will get the same application Context . >> >> But all above is theoretical assumption . I still need to try it out >> (unfortunately i dont have a cluster setup at my end) but if possible >> please let me know if this can work . >> >> Thanks >> Ankur >> >> On Sun, Oct 11, 2015 at 11:48 AM, Ankur Garg <[email protected]> >> wrote: >> >>> Thanks for replying Ravi . >>> >>> I think your suggestion to make wrapper to read json or xml is a very >>> nice Idea indeed . >>> >>> But , the problem for me here is to have the context (with all beans >>> loaded and initialized ) available inside the Spouts and Bolts and that >>> means inside every running instance of Spouts and Bolts which may be >>> running on different machines and different jvm. >>> >>> Agree that when defining topology I dont need Spring Context as I just >>> have to define spouts and bolts there. I used context here to send them to >>> spout and bolt through constructor but it appears from comments above that >>> it wont work on distributed cluster . >>> >>> So , is there some way that once topology gets submitted to run in a >>> distributed cluster , I can initialize my context there and someway they >>> are available to all Spouts and Bolts ..Basically some shared location >>> where my application Context can be initialized (once and only once) and >>> this context can be accessed by >>> all instances of Spouts and Bolts ? >>> >>> Thanks >>> >>> On Sun, Oct 11, 2015 at 11:20 AM, Ravi Sharma <[email protected]> >>> wrote: >>> >>>> Basically u will have two context defined at different time/phase >>>> >>>> When u r about to submit the topology, u need to build topology, that >>>> context only need information about spouts and bolts. You don't need any >>>> application bean like database accessories or ur services etc, as at this >>>> level u r not running ur application but u r just creating a topology and >>>> defining how bolts and spouts are connected to each other etc etc >>>> >>>> Now once topology is submitted, topology will be moved to one of the >>>> supervisor node and will start running, all spouts and bolts will be >>>> initialized, at this moment u will need ur application context, which >>>> doesn't need ur earlier topology context >>>> >>>> So I will suggest keep both context separate. >>>> >>>> Topology is not complex to build, smaller topology can be built via >>>> code only, I. E. Which bolt listening to which spout, but if u want to go >>>> with good design, I say just write a small wrapper to read some json where >>>> u can define ur bolts and spouts and use that to build topology (u can use >>>> spring but it's not much needed) >>>> >>>> In past I have done it using both json setting (without spring) and xml >>>> setting (with spring) both works good >>>> >>>> Ravi >>>> On 11 Oct 2015 06:38, "Ankur Garg" <[email protected]> wrote: >>>> >>>>> Oh The problem here is I have many beans and which need to be >>>>> initialized (some are reading conf from yml files , database connection , >>>>> thread pool initialization etc) . >>>>> >>>>> >>>>> Now , I have written a spring boot application which takes care of all >>>>> the above and I define my topology inside one of the beans , Here is my >>>>> bean >>>>> >>>>> @Autowired >>>>> ApplicationContext appContext; >>>>> >>>>> @Bean >>>>> public void submitTopology() throws >>>>> AlreadyAliveException,InvalidTopologyException { >>>>> >>>>> TopologyBuilder builder = new TopologyBuilder(); >>>>> >>>>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext), >>>>> 10); >>>>> >>>>> builder.setBolt("mapBolt", new GroupingBolt(appContext), >>>>> 10).shuffleGrouping("rabbitMqSpout"); >>>>> >>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext), >>>>> 10).shuffleGrouping("mapBolt"); >>>>> >>>>> Config conf = new Config(); >>>>> >>>>> conf.registerSerialization(EventBean.class); // To be registered with >>>>> Kyro for Storm >>>>> >>>>> conf.registerSerialization(InputQueueManagerImpl.class); >>>>> >>>>> conf.setDebug(true); >>>>> >>>>> conf.setMessageTimeoutSecs(200); >>>>> >>>>> LocalCluster cluster = new LocalCluster(); >>>>> >>>>> cluster.submitTopology("test", conf, builder.createTopology()); >>>>> >>>>> } >>>>> >>>>> >>>>> When this bean is initialized , I already have appContext initialized >>>>> by my Spring Boot Application . So , the thing is , I am using SpringBoot >>>>> to initialize and load my context with all beans . >>>>> >>>>> Now this is the context which I want to leverage in my spouts and >>>>> bolts . >>>>> >>>>> So , if what I suggested earlier does not work on Storm Distributed >>>>> Cluster , I need to find a way of initializing my AppContext somehow:( >>>>> >>>>> I would be really thankful if anyone here can help me :( >>>>> >>>>> >>>>> Thanks >>>>> >>>>> Ankur >>>>> >>>>> On Sun, Oct 11, 2015 at 5:54 AM, Javier Gonzalez <[email protected]> >>>>> wrote: >>>>> >>>>>> The local cluster runs completely within a single JVM AFAIK. The >>>>>> local cluster is useful for development, testing your topology, etc. The >>>>>> real deployment has to go through nimbus, run on workers started by >>>>>> supervisors on one or more nodes, etc. Kind of difficult to simulate all >>>>>> that on a single box. >>>>>> >>>>>> On Sat, Oct 10, 2015 at 1:45 PM, Ankur Garg <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Oh ...So I will have to test it in a cluster. >>>>>>> >>>>>>> Having said that, how is local cluster which we use is too different >>>>>>> from normal cluster.. Ideally ,it shud simulate normal cluster.. >>>>>>> On Oct 10, 2015 7:51 PM, "Ravi Sharma" <[email protected]> wrote: >>>>>>> >>>>>>>> Hi Ankur, >>>>>>>> local it may be working but It wont work in Actual cluster. >>>>>>>> >>>>>>>> Think about SpringContext is collection of your so many resoucres, >>>>>>>> like Database connections , may be HTTP connections , Thread pools etc. >>>>>>>> These things wont get serialised and just go to other machines and >>>>>>>> start working. >>>>>>>> >>>>>>>> SO basically in init methods of bolt and spout, you need to call >>>>>>>> some singloton class like this >>>>>>>> >>>>>>>> ApplicationContext ac = SingletonApplicationContext.getContext(); >>>>>>>> >>>>>>>> SingletonApplicationContext will have a static variable >>>>>>>> ApplicationContext and in getContext you will check if static variable >>>>>>>> has >>>>>>>> been initialised if not then u will initilize it, and then return >>>>>>>> it(normal >>>>>>>> Singleton class) >>>>>>>> >>>>>>>> >>>>>>>> Now when Topolgy will move to any other node, Bolt and spouts will >>>>>>>> start and first init call will initialize it and other bolt/spouts will >>>>>>>> just use that. >>>>>>>> >>>>>>>> As John mentioned, its very important to mark all Spring beans and >>>>>>>> Context as transient. >>>>>>>> >>>>>>>> Hope it helps. >>>>>>>> >>>>>>>> Ravi. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Oct 10, 2015 at 6:25 AM, Ankur Garg <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Javier , >>>>>>>>> >>>>>>>>> So , I am using a Local cluster on my dev machine where I am using >>>>>>>>> Eclipse . Here , I am passing Springs ApplicationContext as >>>>>>>>> constructor >>>>>>>>> argument to spouts and bolts . >>>>>>>>> >>>>>>>>> TopologyBuilder builder = new TopologyBuilder(); >>>>>>>>> >>>>>>>>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout( >>>>>>>>> appContext), 10); >>>>>>>>> >>>>>>>>> builder.setBolt("mapBolt", new GroupingBolt(appContext), >>>>>>>>> 10).shuffleGrouping("rabbitMqSpout"); >>>>>>>>> >>>>>>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext), >>>>>>>>> 10).shuffleGrouping("mapBolt"); >>>>>>>>> >>>>>>>>> Config conf = new Config(); >>>>>>>>> >>>>>>>>> conf.registerSerialization(EventBean.class); / >>>>>>>>> >>>>>>>>> conf.registerSerialization(InputQueueManagerImpl.class); >>>>>>>>> >>>>>>>>> conf.setDebug(true); >>>>>>>>> >>>>>>>>> LocalCluster cluster = new LocalCluster(); >>>>>>>>> >>>>>>>>> cluster.submitTopology("test", conf, builder.createTopology()); >>>>>>>>> >>>>>>>>> >>>>>>>>> And in my spouts and Bolts , >>>>>>>>> >>>>>>>>> I make my Application Context variable as static . So when it is >>>>>>>>> launched by c;uster.submitTopology , my context is still avalilable >>>>>>>>> >>>>>>>>> >>>>>>>>> private static ApplicationContext ctx; >>>>>>>>> >>>>>>>>> public RabbitListnerSpout(ApplicationContext appContext) { >>>>>>>>> >>>>>>>>> LOG.info("RabbitListner Constructor called"); >>>>>>>>> >>>>>>>>> ctx = appContext; >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> @SuppressWarnings("rawtypes") >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> >>>>>>>>> public void open(Map conf, TopologyContext >>>>>>>>> context,SpoutOutputCollector >>>>>>>>> collector) { >>>>>>>>> >>>>>>>>> LOG.info("Inside the open Method for RabbitListner Spout"); >>>>>>>>> >>>>>>>>> inputManager = (InputQueueManagerImpl) ctx >>>>>>>>> .getBean(InputQueueManagerImpl.class); >>>>>>>>> >>>>>>>>> notificationManager = (NotificationQueueManagerImpl) ctx >>>>>>>>> .getBean(NotificationQueueManagerImpl.class); >>>>>>>>> >>>>>>>>> eventExchange = ctx.getEnvironment().getProperty( >>>>>>>>> "input.rabbitmq.events.exchange"); >>>>>>>>> >>>>>>>>> routingKey = ctx.getEnvironment().getProperty( >>>>>>>>> "input.rabbitmq.events.routingKey"); >>>>>>>>> >>>>>>>>> eventQueue = ctx.getEnvironment().getProperty( >>>>>>>>> "input.rabbitmq.events.queue"); >>>>>>>>> >>>>>>>>> _collector = collector; >>>>>>>>> >>>>>>>>> LOG.info("Exiting the open Method for RabbitListner Spout"); >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> This is working like a charm (my ApplicationContext is initialized >>>>>>>>> seperately ) . As we all know , ApplicationContext is not >>>>>>>>> serializable . >>>>>>>>> But this works well in LocalCluster. >>>>>>>>> >>>>>>>>> My assumption is that it will work in a seperate Cluster too . Is >>>>>>>>> my assumption correct ?? >>>>>>>>> >>>>>>>>> On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> IIRC, only if everything you use in your spouts and bolts is >>>>>>>>>> serializable. >>>>>>>>>> On Oct 6, 2015 11:29 PM, "Ankur Garg" <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Ravi , >>>>>>>>>>> >>>>>>>>>>> I was able to make an Integration with Spring but the problem is >>>>>>>>>>> that I have to autowire for every bolt and spout . That means that >>>>>>>>>>> even if >>>>>>>>>>> i parallelize spout and bolt it will get started to each instance >>>>>>>>>>> . Is >>>>>>>>>>> there some way that I only have to do for bolts and spouts once (I >>>>>>>>>>> mean if >>>>>>>>>>> I parallelize bolts or spouts individually it can share the conf >>>>>>>>>>> from >>>>>>>>>>> somewhere) . IS this possible?? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Ankur >>>>>>>>>>> >>>>>>>>>>> On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Yes this is for annotation also... >>>>>>>>>>>> >>>>>>>>>>>> you can call this method in prepare() method of bolt and >>>>>>>>>>>> onOpen() method >>>>>>>>>>>> in every Spout and make sure you don't use any autowire bean >>>>>>>>>>>> before this >>>>>>>>>>>> call. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Ravi. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Sep 29, 2015 at 2:22 PM, Ankur Garg < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>> > Hi Ravi , >>>>>>>>>>>> > >>>>>>>>>>>> > Thanks for your reply . I am using annotation based >>>>>>>>>>>> configuration and using >>>>>>>>>>>> > Spring Boot. >>>>>>>>>>>> > >>>>>>>>>>>> > Any idea how to do it using annotations ? >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > On Tue, Sep 29, 2015 at 6:41 PM, Ravi Sharma < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> > >>>>>>>>>>>> > > Bolts and Spouts are created by Storm and not known to >>>>>>>>>>>> Spring Context. >>>>>>>>>>>> > You >>>>>>>>>>>> > > need to manually add them to SpringContext, there are few >>>>>>>>>>>> methods >>>>>>>>>>>> > available >>>>>>>>>>>> > > i.e. >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > >>>>>>>>>>>> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this, >>>>>>>>>>>> > > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT, false); >>>>>>>>>>>> > > >>>>>>>>>>>> > > SpringContext is my own class where i have injected >>>>>>>>>>>> SpringContext so >>>>>>>>>>>> > > SpringContext.getContext() returns the actuall Spring >>>>>>>>>>>> Context >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > Ravi. >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > On Tue, Sep 29, 2015 at 1:03 PM, Ankur Garg < >>>>>>>>>>>> [email protected]> >>>>>>>>>>>> > wrote: >>>>>>>>>>>> > > >>>>>>>>>>>> > > > Hi , >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > I am building a Storm topology with set of Spouts and >>>>>>>>>>>> Bolts and also >>>>>>>>>>>> > > using >>>>>>>>>>>> > > > Spring for Dependency Injection . >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > Unfortunately , none of my fields are getting autowired >>>>>>>>>>>> even though I >>>>>>>>>>>> > > have >>>>>>>>>>>> > > > declared all my spouts and Bolts as @Components . >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > However the place where I am declaring my topology , >>>>>>>>>>>> Spring is working >>>>>>>>>>>> > > fine >>>>>>>>>>>> > > > . >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > Is it because cluster.submitTopology("test", conf, >>>>>>>>>>>> > > > builder.createTopology()) >>>>>>>>>>>> > > > submits the topology to a cluster (locally it spawns >>>>>>>>>>>> different thread >>>>>>>>>>>> > > for >>>>>>>>>>>> > > > Spouts and Bolts) that Autowiring is not working? >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > Please suggest . >>>>>>>>>>>> > > > >>>>>>>>>>>> > > >>>>>>>>>>>> > >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Javier González Nicolini >>>>>> >>>>> >>>>> >>> >>
