[
https://issues.apache.org/jira/browse/FLUME-2772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ming-Lung Weng updated FLUME-2772:
----------------------------------
Description:
I found SpoolDirectorySource drop Event when channel(memory channel i used) is
full
at org.apache.flume.source.SpoolDirectorySource.run()
{code:title=Bar.java|borderStyle=solid}
while (!Thread.interrupted()) {
List<Event> events = reader.readEvents(batchSize);
if (events.isEmpty()) {
break;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();
try {
getChannelProcessor().processEventBatch(events);
reader.commit();
} catch (ChannelException ex) {
logger.warn("The channel is full, and cannot write data now. The " +
"source will try again after " + String.valueOf(backoffInterval) +
" milliseconds");
hitChannelException = true;
if (backoff) {
TimeUnit.MILLISECONDS.sleep(backoffInterval);
backoffInterval = backoffInterval << 1;
backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
backoffInterval;
}
continue;
}
backoffInterval = 250;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
}
{code}
SpoolDirectorySource sleep a backoff time and re-calculate backoff time,
and continue to read next event DO NOTHING with the event that not put into
channel
There is no exception but only logger.warn that sound not important
This is a bug to me, but I'm not sure is this a bug
I use a simple while loop to fix this, just like
{code:title=Bar.java|borderStyle=solid}
boolean eventPushed = false;
while(!eventPushed) {
try {
getChannelProcessor().processEventBatch(events);
reader.commit();
eventPushed = true;
} catch (ChannelException ex) {
logger.warn("The channel is full, and cannot write data now.
The " +
"source will try again after " +
String.valueOf(backoffInterval) +
" milliseconds");
hitChannelException = true;
if (backoff) {
TimeUnit.MILLISECONDS.sleep(backoffInterval);
backoffInterval = backoffInterval << 1;
backoffInterval = backoffInterval >= maxBackoff ?
maxBackoff :
backoffInterval;
}
}
}
{code}
was:
I found SpoolDirectorySource drop Event when channel(memory channel i used) is
full
at org.apache.flume.source.SpoolDirectorySource.run()
{code:title=Bar.java|borderStyle=solid}
while (!Thread.interrupted()) {
List<Event> events = reader.readEvents(batchSize);
if (events.isEmpty()) {
break;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();
try {
getChannelProcessor().processEventBatch(events);
reader.commit();
} catch (ChannelException ex) {
logger.warn("The channel is full, and cannot write data now. The " +
"source will try again after " + String.valueOf(backoffInterval) +
" milliseconds");
hitChannelException = true;
if (backoff) {
TimeUnit.MILLISECONDS.sleep(backoffInterval);
backoffInterval = backoffInterval << 1;
backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
backoffInterval;
}
continue;
}
backoffInterval = 250;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
}
{code}
SpoolDirectorySource sleep a backoff time and re-calculate backoff time,
and continue to read next event DO NOTHING with the event that not put into
channel
There is no exception but only logger.warn that sound not important
This is a bug to me, but I'm not sure is this a bug
I use a simple while loop to fix this, just like
{code:title=Bar.java|borderStyle=solid}
boolean eventPushed = false;
while(!eventPushed) {
try {
getChannelProcessor().processEventBatch(events);
reader.commit();
eventPushed = true;
} catch (ChannelException ex) {
logger.warn("The channel is full, and cannot write data now.
The " +
"source will try again after " +
String.valueOf(backoffInterval) +
" milliseconds");
hitChannelException = true;
if (backoff) {
TimeUnit.MILLISECONDS.sleep(backoffInterval);
backoffInterval = backoffInterval << 1;
backoffInterval = backoffInterval >= maxBackoff ?
maxBackoff :
backoffInterval;
}
}
continue;
}
{code}
> SpoolDirectorySource drop event while the channel is full, with no any
> exception
> --------------------------------------------------------------------------------
>
> Key: FLUME-2772
> URL: https://issues.apache.org/jira/browse/FLUME-2772
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: 1.6
> Reporter: Ming-Lung Weng
>
> I found SpoolDirectorySource drop Event when channel(memory channel i used)
> is full
> at org.apache.flume.source.SpoolDirectorySource.run()
> {code:title=Bar.java|borderStyle=solid}
> while (!Thread.interrupted()) {
> List<Event> events = reader.readEvents(batchSize);
> if (events.isEmpty()) {
> break;
> }
> sourceCounter.addToEventReceivedCount(events.size());
> sourceCounter.incrementAppendBatchReceivedCount();
> try {
> getChannelProcessor().processEventBatch(events);
> reader.commit();
> } catch (ChannelException ex) {
> logger.warn("The channel is full, and cannot write data now. The
> " +
> "source will try again after " +
> String.valueOf(backoffInterval) +
> " milliseconds");
> hitChannelException = true;
> if (backoff) {
> TimeUnit.MILLISECONDS.sleep(backoffInterval);
> backoffInterval = backoffInterval << 1;
> backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
> backoffInterval;
> }
> continue;
> }
> backoffInterval = 250;
> sourceCounter.addToEventAcceptedCount(events.size());
> sourceCounter.incrementAppendBatchAcceptedCount();
> }
> {code}
> SpoolDirectorySource sleep a backoff time and re-calculate backoff time,
> and continue to read next event DO NOTHING with the event that not put into
> channel
> There is no exception but only logger.warn that sound not important
> This is a bug to me, but I'm not sure is this a bug
> I use a simple while loop to fix this, just like
> {code:title=Bar.java|borderStyle=solid}
> boolean eventPushed = false;
> while(!eventPushed) {
> try {
> getChannelProcessor().processEventBatch(events);
> reader.commit();
> eventPushed = true;
> } catch (ChannelException ex) {
> logger.warn("The channel is full, and cannot write data
> now. The " +
> "source will try again after " +
> String.valueOf(backoffInterval) +
> " milliseconds");
> hitChannelException = true;
> if (backoff) {
> TimeUnit.MILLISECONDS.sleep(backoffInterval);
> backoffInterval = backoffInterval << 1;
> backoffInterval = backoffInterval >= maxBackoff ?
> maxBackoff :
> backoffInterval;
> }
> }
>
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)