Hi there,
I'm Kakine from IIJ. I'm a new to here. Nice to meet you.
I expected Channel's start() and stop() are performed, because Channel
is LifecycleAware. But as far as my investigation of the source code,
none seems to call start() and stop() of it.
Below patch is to add calling Channel's start() and stop() on the node
configuration changes. Do you think this change is ok to go in?
Thanks,
Index:
flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
===================================================================
---
flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
(revision 1325150)
+++
flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
(working copy)
@@ -21,6 +21,7 @@
import java.util.Map.Entry;
+import org.apache.flume.Channel;
import org.apache.flume.SinkRunner;
import org.apache.flume.SourceRunner;
import org.apache.flume.lifecycle.LifecycleAware;
@@ -73,6 +74,15 @@
logger.error("Error while stopping {}", entry.getValue(), e);
}
}
+
+ for (Entry<String, Channel> entry : nodeConfiguration
+ .getChannels().entrySet()) {
+ try{
+ nodeSupervisor.unsupervise(entry.getValue());
+ } catch (Exception e) {
+ logger.error("Error while stopping {}", entry.getValue(), e);
+ }
+ }
}
this.nodeConfiguration = nodeConfiguration;
@@ -95,6 +105,16 @@
logger.error("Error while starting {}", entry.getValue(), e);
}
}
+
+ for (Entry<String, Channel> entry : nodeConfiguration
+ .getChannels().entrySet()) {
+ try{
+ nodeSupervisor.supervise(entry.getValue(),
+ new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ } catch (Exception e) {
+ logger.error("Error while starting {}", entry.getValue(), e);
+ }
+ }
}
@Override
---------------------------
Hiroyuki Kakine <[email protected]>
Internet Initiative Japan Inc.