Jonathan Schoreels created CAMEL-16903:
------------------------------------------

             Summary: JdbcAggregationRepository not storing properties 
interfers with AggregateProcessor to restore completiontimeouts
                 Key: CAMEL-16903
                 URL: https://issues.apache.org/jira/browse/CAMEL-16903
             Project: Camel
          Issue Type: Bug
          Components: came-core, camel-sql
            Reporter: Jonathan Schoreels


Hello,

I noticed during some testing with our production app that when a camel context 
is restarted, the completation timeouts of the aggregation were lost. I had the 
message :

"Restored 0 CompletionTimeout conditions in the AggregationTimeoutChecker in 1 
minute"

 

After some investigation, it looks the timeouts are restored from the 
properties of the exchange, snippet from AggregateProcessor in branch master :

 
{code:java}
protected void restoreTimeoutMapFromAggregationRepository() throws Exception {
    // grab the timeout value for each partly aggregated exchange
    Set<String> keys = aggregationRepository.getKeys();
    if (keys == null || keys.isEmpty()) {
        return;
    }

    StopWatch watch = new StopWatch();
    LOG.trace("Starting restoring CompletionTimeout for {} existing exchanges 
from the aggregation repository...",
            keys.size());

    for (String key : keys) {
        Exchange exchange = aggregationRepository.get(camelContext, key);
        // grab the timeout value
        long timeout = exchange.hasProperties() ? 
exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0;
        if (timeout > 0) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Restoring CompletionTimeout for exchangeId: {} with 
timeout: {} millis.",
                        exchange.getExchangeId(), timeout);
            }
            addExchangeToTimeoutMap(key, exchange, timeout);
        }
    }

    // log duration of this task so end user can see how long it takes to 
pre-check this upon starting
    LOG.info("Restored {} CompletionTimeout conditions in the 
AggregationTimeoutChecker in {}",
            timeoutMap.size(), TimeUtils.printDuration(watch.taken()));
}

{code}
But, as the JdbcAggregationRepository doesn't store properties (by design I 
think), the timeout is lost and the aggregations are restored without ones.

 

This can be problematic because it causes a leakage of of element that will 
never be completed if the aggregation should have finished by a timeout.

 

As I see it, there is two possibilities :
 * Modifying the current implementation of AggregateProcessor to load those 
from Headers : Quite impactful for something that only few aggregation 
repository will need
 * Modifying JdbcAggregationRepository to store the exchange properties : may 
leak some internal / extra information in the database.

What's your thoughts ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to