Hi, all,

We use flume to collect various data, and implemented one customized sink 
plugin to process these data.

And we have a question that can we do some works like the sink.process() in the 
sin.stop() implemention,
 to try take and process events from the channel in sink.stop() function.
Because the possible uncontrollable interruptions in reality production 
enviroment,
 and if the data transferred by flume is interrupted, we have to redo all the 
work from the beginning.
(PS, we can stop a collection work when one event with end tag in the headers.)
That is why we want to try to process events when stop() is called.

In our tests,  when we sent interruptions such as keyboard interuption (ctrl + 
c), if there are a lot of events
in the channel ( think of a case that channel with a big capacity, and cached 
too much events ),
sink.stop() can take events but may cost a lot of time to finish all the 
processing and cleanup works.
If the source is still able to put events to the channel, it will be a dead 
lock.

Is it a bad solution? any side effect on this designment? Any comments or 
suggestions are welcome.

Here is my stop() function,

public void stop() {
Channel channel = getChannel();
if (null != channel)
{
try {
while ( Status.BACKOFF != process())
{
;
}
} catch (EventDeliveryException e) {
logger.error("Parquet Sink " + getName() + ": Unable to process event. "
+"Exception follows.", e);
}
}
processor.destroy();
processor = null; // disable anyone to call processor again.
super.stop();
}

Regards,
________________________________
jackeylv

Reply via email to