PaulZeng created FLUME-3438:
-------------------------------
Summary: The transaction close not correctly
Key: FLUME-3438
URL: https://issues.apache.org/jira/browse/FLUME-3438
Project: Flume
Issue Type: Bug
Components: Sinks+Sources
Affects Versions: 1.10.1, 1.10.0
Environment: linux
Reporter: PaulZeng
Here is my code:
{code:java}
@Override
public Sink.Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
logger.debug("open transaction for: {}", channel.getName());
transaction.begin();
List<Event> eventList = new ArrayList<>();
try {
while (eventList.size() < maxBatchSize) {
Event event = channel.take();
if (event == null) {
if (eventList.isEmpty()) {
transaction.commit();
logger.debug("No data in channel!");
return Sink.Status.BACKOFF;
} else {
break;
}
}
logger.trace(EventHelper.eventBodyToString(event));
eventList.add(event);
}
handleEvent(eventList);
logger.debug("commit for: {}", channel.getName());
transaction.commit();
sinkCounter.addToEventDrainSuccessCount(eventList.size());
} catch (Exception ex) {
logger.debug("rollback for: {}", channel.getName());
transaction.rollback();
sinkCounter.incrementChannelReadFail();
throw new EventDeliveryException("Failed to log event: " +
ex.getMessage(), ex);
} finally {
logger.debug("close for: {}", channel.getName());
transaction.close();
}
return Sink.Status.READY;
}
{code}
I found that the program will goto the close without commit or rollback while
the running time of handleEvent() is too long(or maybe orther reason). It seens
that the run() function has been forced to stop by any other things. This
problem cause the sink process fail because the transaction is not
commit/rollback correctly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]