[
https://issues.apache.org/jira/browse/HADOOP-5640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12697234#action_12697234
]
Todd Lipcon commented on HADOOP-5640:
-------------------------------------
{quote}
Another approach would be to come up with a asynchronous publish/subscribe kind
of model. The namenode/datanode could write data to this channel without
waiting for the consumer(s) to pick it up. It could be similar to a
file-change-log, but will also contain internal state changes of dfs modules.
Thoughts?
{quote}
I like the idea of an async pub/sub model. A couple options for how we might
implement this:
a) We have a single LinkedBlockingQueue attached to the service. When an event
happens, the service enqueues an event. We have a PluginDispatcher instance
which has a thread doing something like:
{code:java}
while (true) {
Event e = queue.take();
for (Plugin p : plugins) {
p.handleEvent(e);
}
}
{code}
events are dispatched from the service by just calling
dispatcher.enqueueEvent(foo). We also delegate plugin registration/start/stop
to PluginDispatcher
b) We have a LinkedBlockingQueue per plugin. Plugins are responsible for
creating a thread which does a similar loop to above, trying to take events off
the queue. Dispatch from the service then looks like:
{code:java}
for (Plugin p : plugins) {
p.enqueueEvent(foo);
}
{code}
Both of the options above are a little ugly in that they require classes for
each type of event that can be handled, and introduce a handleEvent(PluginEvent
e) function in the plugins, likely with an ugly switch statement. With my
functional programmer hat on, I'd personally prefer something like:
{code:java}
/* does this generic interface exist somewhere in hadoop yet? */
interface <T> SingleArgumentCaller {
void call(T p);
}
/* in namenode: */
...
// we just heard about a new data node
dispatcher.enqueue(new SingleArgumentCaller<DatanodePlugin>() {
void call(DatanodePlugin p) { p.newDatanodeAppeared(...); }
});
..
interface DatanodePlugin {
void newDatanodeAppeared(...);
/* all the other hook points */
}
/* dispatcher looks like: */
class <T> PluginDispatcher {
private LinkedBlockingQueue<T> queue;
private List<T> plugins;
void run() {
while (true) {
SingleArgumentCaller caller = queue.take();
for (T plugin : plugins) {
caller.call(plugin); /* plus some try..catch */
}
}
}
}
{code}
If no one has any strong objections, I'll go with the SingleArgumentCaller
route, since I think class-per-event proliferation here is a messy solution.
> Allow ServicePlugins to hook callbacks into key service events
> --------------------------------------------------------------
>
> Key: HADOOP-5640
> URL: https://issues.apache.org/jira/browse/HADOOP-5640
> Project: Hadoop Core
> Issue Type: Improvement
> Components: util
> Reporter: Todd Lipcon
>
> HADOOP-5257 added the ability for NameNode and DataNode to start and stop
> ServicePlugin implementations at NN/DN start/stop. However, this is
> insufficient integration for some common use cases.
> We should add some functionality for Plugins to subscribe to events generated
> by the service they're plugging into. Some potential hook points are:
> NameNode:
> - new datanode registered
> - datanode has died
> - exception caught
> - etc?
> DataNode:
> - startup
> - initial registration with NN complete (this is important for HADOOP-4707
> to sync up datanode.dnRegistration.name with the NN-side registration)
> - namenode reconnect
> - some block transfer hooks?
> - exception caught
> I see two potential routes for implementation:
> 1) We make an enum for the types of hookpoints and have a general function in
> the ServicePlugin interface. Something like:
> {code:java}
> enum HookPoint {
> DN_STARTUP,
> DN_RECEIVED_NEW_BLOCK,
> DN_CAUGHT_EXCEPTION,
> ...
> }
> void runHook(HookPoint hp, Object value);
> {code}
> 2) We make classes specific to each "pluggable" as was originally suggested
> in HADDOP-5257. Something like:
> {code:java}
> class DataNodePlugin {
> void datanodeStarted() {}
> void receivedNewBlock(block info, etc) {}
> void caughtException(Exception e) {}
> ...
> }
> {code}
> I personally prefer option (2) since we can ensure plugin API compatibility
> at compile-time, and we avoid an ugly switch statement in a runHook()
> function.
> Interested to hear what people's thoughts are here.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.