Luis Miguel created CAMEL-12620:
-----------------------------------
Summary: CompletionAwareAggregationStrategy onCompletition method
null exchange.
Key: CAMEL-12620
URL: https://issues.apache.org/jira/browse/CAMEL-12620
Project: Camel
Issue Type: Bug
Reporter: Luis Miguel
Hello.
We have a camel project to process some Csv files, we created a class
implementing "CompletionAwareAggregationStrategy" in order to aggregate each
row processed and we override the methods "aggregate" and "onCompletion" from
it.
The way we process the Csv file is parallelized with the "parallelProcessing"
and indicating an "executorService" with 8 concurrently threads.
We are having a weird issue that rarely happens, and is that in the middle of
the process of the file, the method "onCompletion" is being called (even when
the file is not complete yet) and it sets the argument "Exchange" as NULL so
the whole camel route is messed up.
As I said this rarely happens when we try to reprocess the file the error is
gone.
Here's the main route that process the CSV file, notice that for the split
process we create an instance of "CsvAggregationStrategy"
{code:java}
@Qualifier("executorServicePublicationItemCsv")
@Autowired
private ExecutorService executorService;
@Override
public void configure() throws Exception {
String[] header = Arrays.stream(PublicationItemCsvFields.values())
.map(PublicationItemCsvFields::getText).toArray(String[]::new);
Map<String, String> csvFieldsByEntityAttribute = new HashMap<>();
mapFields(csvFieldsByEntityAttribute);
//@formatter:off
from("file:publicationItemData?delete={{routes.push-csv-to-service.delete-source-file}}")
.streamCaching()
.routeId("push-publication-item-csv-to-service")
.onException(Exception.class)
.handled(true)
.log(LoggingLevel.ERROR, "Error in publication item route, sending an email:
${exception.message} ${exception.stacktrace}")
.to("direct:sendImportErrorReport")
.end()
.log(LoggingLevel.INFO, "Beginning to import publication item CSV:
${file:onlyname}")
.unmarshal(new CsvDataFormat()
.setSkipHeaderRecord(true)
.setNullString(EMPTY)
.setLazyLoad(true))
.split(body(), new CsvAggregationStrategy())
.streaming()
.parallelProcessing().executorService(executorService)
.to("direct:publication-item-splitter")
.end()
.choice()
.when(simple("${exchangeProperty.aggregationError} != null"))
.log("An error occurred when aggregating exchanges, sending an email with the
error.")
.setProperty("original_body", body())
.to("direct:sendAggregationErrorEmail")
.setBody(exchangeProperty("original_body"))
.end()
.choice()
.when(simple("${exchangeProperty.badCsvData.size()} > 0"))
.setBody(simple("${exchangeProperty.badCsvData}"))
.marshal(new CsvDataFormat().setHeader(header))
.setProperty("badRowsBody").simple("${body}")
.end()
.choice()
.when(simple("${exchangeProperty.successfulRecords.size()} > 0"))
.setBody(simple("${exchangeProperty.successfulRecords}"))
.marshal(new CsvDataFormat().setHeader(header))
.setProperty("successfulRowsBody").simple("${body}")
.end()
.to("direct:sendImportReport").end()
.log("Completed import for publication item CSV: '${file:onlyname}'");
from("direct:publication-item-splitter")
.streamCaching()
.routeId("push-publication-item-splitter")
.onException(PublicationItemImportException.class)
.handled(true)
.log(LoggingLevel.ERROR, "Error importing publication item data:
${exception.message} ${exception.stacktrace}")
.end()
.onException(HttpHostConnectException.class)
.handled(true)
.log(LoggingLevel.ERROR, "Error connecting to publication item service host:
${exception.host}. Request body: ${body}")
.end()
.onException(HttpOperationFailedException.class)
.handled(true)
.log(LoggingLevel.ERROR, "Error received from publication item service: HTTP
${exception.statusCode}. Response body: ${exception.responseBody}. Request
body: ${body}")
.end()
.onException(Exception.class)
.handled(true)
.log(LoggingLevel.ERROR, "Error: ${exception.message} ${exception.stacktrace}")
.end()
.setProperty("csvRowData").simple("${body}", List.class)
.setProperty("csvFieldsByEntityAttribute").constant(csvFieldsByEntityAttribute)
.bean(publicationItemCSVDataHandler)
.marshal().json(JsonLibrary.Jackson)
.setHeader(HttpHeaders.AUTHORIZATION, simple("Basic
"+propertyServiceAuthorization))
.log("Item ID: ${property.itemId}")
.choice()
.when().simple("${property.itemId} != null")
.setHeader(Exchange.HTTP_PATH, simple("${property.itemId}"))
.to("rest:PUT:items?host={{backend.event-service.host}}")
.otherwise()
.to("rest:POST:items?host={{backend.event-service.host}}")
.end()
.setProperty("responseId").jsonpath("$.id", true)
.setProperty("idColumnPosition").constant(PublicationItemCsvFields.ID.getNumber())
.choice()
.when(exchangeProperty("responseId").isNull())
.throwException(PublicationItemImportException.class, "Unexpected rest "
+ "response (no id returned)")
.otherwise()
.end();
//@formatter:on
}
{code}
And this is the CsvAggregationStrategy, we're getting a null pointer Exception
in the onCompletition method due to a null exchange.
{code:java}
@Slf4j
public class CsvAggregationStrategy implements
CompletionAwareAggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
try {
if (oldExchange == null) {
oldExchange = newExchange;
oldExchange.setProperty("badCsvData", new TreeMap<>()); // TreeMap ensures a
sorted order
oldExchange.setProperty("successfulRecords", new TreeMap<>());
}
if (newExchange.getProperty(Exchange.SPLIT_COMPLETE, boolean.class)) {
oldExchange.setProperty("numberOfCSVRows",
newExchange.getProperty(Exchange.SPLIT_SIZE));
}
Exception exception = newExchange.getProperty(Exchange.EXCEPTION_CAUGHT,
Exception.class);
if (exception != null) {
@SuppressWarnings("unchecked")
Map<Integer, Object> badCsvData = oldExchange.getProperty("badCsvData",
Map.class);
@SuppressWarnings("unchecked")
List<String> csvRowData = newExchange.getProperty("csvRowData", List.class);
if (exception instanceof HttpOperationFailedException) {
Map<String, String> csvFieldsByEntityAttribute =
newExchange.getProperty("csvFieldsByEntityAttribute", Map.class);
String responseBody = ((HttpOperationFailedException)
exception).getResponseBody();
String errorMessage = getErrorMessage(responseBody, csvFieldsByEntityAttribute,
((HttpOperationFailedException) exception).getStatusCode());
csvRowData.add(errorMessage);
} else {
csvRowData.add(exception.getMessage());
}
badCsvData.put(newExchange.getProperty(Exchange.SPLIT_INDEX, Integer.class),
csvRowData);
oldExchange.setProperty("badCsvData", badCsvData);
}else {
@SuppressWarnings("unchecked")
Map<Integer, Object> sucessRecords =
oldExchange.getProperty("successfulRecords", Map.class);
@SuppressWarnings("unchecked")
List<String> csvRowData = newExchange.getProperty("csvRowData", List.class);
Integer idPosition = (Integer) newExchange.getProperty("idColumnPosition");
if(idPosition != null) {
csvRowData.set(idPosition, (String)newExchange.getProperty("responseId"));
}else {
csvRowData.add((String)newExchange.getProperty("responseId"));
}
sucessRecords.put(newExchange.getProperty(Exchange.SPLIT_INDEX, Integer.class),
csvRowData);
oldExchange.setProperty("successfulRecords", sucessRecords);
}
} catch(Exception e) {
log.error("Error when trying to aggregate exchanges: " + e.getMessage(), e);
if(oldExchange != null) {
oldExchange.setProperty("aggregationError", ExceptionUtils.getStackTrace(e));
}
}
return oldExchange;
}
@Override
public void onCompletion(Exchange exchange) {
@SuppressWarnings("unchecked")
Map<Integer, List<String>> badCsvData = exchange.getProperty("badCsvData",
Map.class);
exchange.setProperty("badCsvData", new ArrayList<>(badCsvData.values()));
@SuppressWarnings("unchecked")
Map<Integer, List<String>> succesfulCsvData =
exchange.getProperty("successfulRecords", Map.class);
exchange.setProperty("successfulRecords", new
ArrayList<>(succesfulCsvData.values()));
/* Removing Exception/Failure properties if any occurred while processing the
CSV rows. */
exchange.removeProperties("CamelFailure*");
exchange.removeProperties("CamelException*");
exchange.removeProperties("CamelError*");
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)