Hiroyuki, Excellent find!! I agree that start() and stop() are not called and should be. That change is in:
https://issues.apache.org/jira/browse/FLUME-1121 However, if that does progress a separate JIRA might. Brock On Mon, Apr 23, 2012 at 4:02 AM, alo alt <[email protected]> wrote: > HI, > > thank you for contributing. > Please file a jira about the wanted change > (https://issues.apache.org/jira/secure/Dashboard.jspa) and submit the patch > against https://reviews.apache.org. Here please select flume-git as project > and add the jira-ID (FLUME-) in the bug field. Then will be the patch > automatically added to your jira. > > thanks, > Alex > > -- > Alexander Lorenz > http://mapredit.blogspot.com > > On Apr 23, 2012, at 12:55 PM, Hiroyuki Kakine wrote: > >> Hi there, >> >> I'm Kakine from IIJ. I'm a new to here. Nice to meet you. >> >> I expected Channel's start() and stop() are performed, because Channel >> is LifecycleAware. But as far as my investigation of the source code, >> none seems to call start() and stop() of it. >> >> Below patch is to add calling Channel's start() and stop() on the node >> configuration changes. Do you think this change is ok to go in? >> >> Thanks, >> >> Index: >> flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java >> =================================================================== >> --- >> flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java >> (revision 1325150) >> +++ >> flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java >> (working copy) >> @@ -21,6 +21,7 @@ >> >> import java.util.Map.Entry; >> >> +import org.apache.flume.Channel; >> import org.apache.flume.SinkRunner; >> import org.apache.flume.SourceRunner; >> import org.apache.flume.lifecycle.LifecycleAware; >> @@ -73,6 +74,15 @@ >> logger.error("Error while stopping {}", entry.getValue(), e); >> } >> } >> + >> + for (Entry<String, Channel> entry : nodeConfiguration >> + .getChannels().entrySet()) { >> + try{ >> + nodeSupervisor.unsupervise(entry.getValue()); >> + } catch (Exception e) { >> + logger.error("Error while stopping {}", entry.getValue(), e); >> + } >> + } >> } >> >> this.nodeConfiguration = nodeConfiguration; >> @@ -95,6 +105,16 @@ >> logger.error("Error while starting {}", entry.getValue(), e); >> } >> } >> + >> + for (Entry<String, Channel> entry : nodeConfiguration >> + .getChannels().entrySet()) { >> + try{ >> + nodeSupervisor.supervise(entry.getValue(), >> + new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); >> + } catch (Exception e) { >> + logger.error("Error while starting {}", entry.getValue(), e); >> + } >> + } >> } >> >> @Override >> --------------------------- >> Hiroyuki Kakine <[email protected]> >> Internet Initiative Japan Inc. >> > -- Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
