PaulZeng created FLUME-3438:
-------------------------------

             Summary: The transaction close not correctly
                 Key: FLUME-3438
                 URL: https://issues.apache.org/jira/browse/FLUME-3438
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: 1.10.1, 1.10.0
         Environment: linux
            Reporter: PaulZeng


Here is my code:
{code:java}
 @Override
    public Sink.Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        logger.debug("open transaction for: {}", channel.getName());
        transaction.begin();
        List<Event> eventList = new ArrayList<>();
        try {
            while (eventList.size() < maxBatchSize) {
                Event event = channel.take();
                if (event == null) {
                    if (eventList.isEmpty()) {
                        transaction.commit();
                        logger.debug("No data in channel!");
                        return Sink.Status.BACKOFF;
                    } else {
                        break;
                    }
                }
                logger.trace(EventHelper.eventBodyToString(event));
                eventList.add(event);
            }
            handleEvent(eventList);
            logger.debug("commit for: {}", channel.getName());
            transaction.commit();
            sinkCounter.addToEventDrainSuccessCount(eventList.size());
        } catch (Exception ex) {
            logger.debug("rollback for: {}", channel.getName());
            transaction.rollback();
            sinkCounter.incrementChannelReadFail();
            throw new EventDeliveryException("Failed to log event: " + 
ex.getMessage(), ex);
        } finally {
            logger.debug("close for: {}", channel.getName());
            transaction.close();
        }

        return Sink.Status.READY;
    }
{code}

I found that the program will goto the close without commit or rollback while 
the running time of handleEvent() is too long(or maybe orther reason). It seens 
that the run() function has been forced to stop by any other things. This 
problem cause the sink process fail because the  transaction is not 
commit/rollback correctly.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to