Hi, all,
We use flume to collect various data, and implemented one customized sink
plugin to process these data.
And we have a question that can we do some works like the sink.process() in the
sin.stop() implemention,
to try take and process events from the channel in sink.stop() function.
Because the possible uncontrollable interruptions in reality production
enviroment,
and if the data transferred by flume is interrupted, we have to redo all the
work from the beginning.
(PS, we can stop a collection work when one event with end tag in the headers.)
That is why we want to try to process events when stop() is called.
In our tests, when we sent interruptions such as keyboard interuption (ctrl +
c), if there are a lot of events
in the channel ( think of a case that channel with a big capacity, and cached
too much events ),
sink.stop() can take events but may cost a lot of time to finish all the
processing and cleanup works.
If the source is still able to put events to the channel, it will be a dead
lock.
Is it a bad solution? any side effect on this designment? Any comments or
suggestions are welcome.
Here is my stop() function,
public void stop() {
Channel channel = getChannel();
if (null != channel)
{
try {
while ( Status.BACKOFF != process())
{
;
}
} catch (EventDeliveryException e) {
logger.error("Parquet Sink " + getName() + ": Unable to process event. "
+"Exception follows.", e);
}
}
processor.destroy();
processor = null; // disable anyone to call processor again.
super.stop();
}
Regards,
________________________________
jackeylv