[
https://issues.apache.org/jira/browse/CAMEL-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897411#comment-16897411
]
Claus Ibsen commented on CAMEL-12620:
-------------------------------------
We need a much simpler reproducer of this issue you think there is.
> CompletionAwareAggregationStrategy onCompletition method null exchange.
> -----------------------------------------------------------------------
>
> Key: CAMEL-12620
> URL: https://issues.apache.org/jira/browse/CAMEL-12620
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Reporter: Luis Miguel
> Priority: Major
>
> Hello.
>
> We have a camel project to process some Csv files, we created a class
> implementing "CompletionAwareAggregationStrategy" in order to aggregate each
> row processed and we override the methods "aggregate" and "onCompletion" from
> it.
> The way we process the Csv file is parallelized with the "parallelProcessing"
> and indicating an "executorService" with 8 concurrently threads.
>
> We are having a weird issue that rarely happens, and is that in the middle of
> the process of the file, the method "onCompletion" is being called (even when
> the file is not complete yet) and it sets the argument "Exchange" as NULL so
> the whole camel route is messed up.
> As I said this rarely happens when we try to reprocess the file the error is
> gone.
>
>
>
> Here's the main route that process the CSV file, notice that for the split
> process we create an instance of "CsvAggregationStrategy"
>
> {code:java}
> @Qualifier("executorServicePublicationItemCsv")
> @Autowired
> private ExecutorService executorService;
> @Override
> public void configure() throws Exception {
> String[] header = Arrays.stream(PublicationItemCsvFields.values())
> .map(PublicationItemCsvFields::getText).toArray(String[]::new);
> Map<String, String> csvFieldsByEntityAttribute = new HashMap<>();
> mapFields(csvFieldsByEntityAttribute);
> //@formatter:off
> from("file:publicationItemData?delete={{routes.push-csv-to-service.delete-source-file}}")
> .streamCaching()
> .routeId("push-publication-item-csv-to-service")
> .onException(Exception.class)
> .handled(true)
> .log(LoggingLevel.ERROR, "Error in publication item route, sending an email:
> ${exception.message} ${exception.stacktrace}")
> .to("direct:sendImportErrorReport")
> .end()
> .log(LoggingLevel.INFO, "Beginning to import publication item CSV:
> ${file:onlyname}")
> .unmarshal(new CsvDataFormat()
> .setSkipHeaderRecord(true)
> .setNullString(EMPTY)
> .setLazyLoad(true))
> .split(body(), new CsvAggregationStrategy())
> .streaming()
> .parallelProcessing().executorService(executorService)
> .to("direct:publication-item-splitter")
> .end()
> .choice()
> .when(simple("${exchangeProperty.aggregationError} != null"))
> .log("An error occurred when aggregating exchanges, sending an email with the
> error.")
> .setProperty("original_body", body())
> .to("direct:sendAggregationErrorEmail")
> .setBody(exchangeProperty("original_body"))
> .end()
> .choice()
> .when(simple("${exchangeProperty.badCsvData.size()} > 0"))
> .setBody(simple("${exchangeProperty.badCsvData}"))
> .marshal(new CsvDataFormat().setHeader(header))
> .setProperty("badRowsBody").simple("${body}")
> .end()
> .choice()
> .when(simple("${exchangeProperty.successfulRecords.size()} > 0"))
> .setBody(simple("${exchangeProperty.successfulRecords}"))
> .marshal(new CsvDataFormat().setHeader(header))
> .setProperty("successfulRowsBody").simple("${body}")
> .end()
> .to("direct:sendImportReport").end()
> .log("Completed import for publication item CSV: '${file:onlyname}'");
> from("direct:publication-item-splitter")
> .streamCaching()
> .routeId("push-publication-item-splitter")
> .onException(PublicationItemImportException.class)
> .handled(true)
> .log(LoggingLevel.ERROR, "Error importing publication item data:
> ${exception.message} ${exception.stacktrace}")
> .end()
> .onException(HttpHostConnectException.class)
> .handled(true)
> .log(LoggingLevel.ERROR, "Error connecting to publication item service host:
> ${exception.host}. Request body: ${body}")
> .end()
> .onException(HttpOperationFailedException.class)
> .handled(true)
> .log(LoggingLevel.ERROR, "Error received from publication item service: HTTP
> ${exception.statusCode}. Response body: ${exception.responseBody}. Request
> body: ${body}")
> .end()
> .onException(Exception.class)
> .handled(true)
> .log(LoggingLevel.ERROR, "Error: ${exception.message}
> ${exception.stacktrace}")
> .end()
> .setProperty("csvRowData").simple("${body}", List.class)
> .setProperty("csvFieldsByEntityAttribute").constant(csvFieldsByEntityAttribute)
> .bean(publicationItemCSVDataHandler)
> .marshal().json(JsonLibrary.Jackson)
> .setHeader(HttpHeaders.AUTHORIZATION, simple("Basic
> "+propertyServiceAuthorization))
> .log("Item ID: ${property.itemId}")
> .choice()
> .when().simple("${property.itemId} != null")
> .setHeader(Exchange.HTTP_PATH, simple("${property.itemId}"))
> .to("rest:PUT:items?host={{backend.event-service.host}}")
> .otherwise()
> .to("rest:POST:items?host={{backend.event-service.host}}")
> .end()
> .setProperty("responseId").jsonpath("$.id", true)
> .setProperty("idColumnPosition").constant(PublicationItemCsvFields.ID.getNumber())
> .choice()
> .when(exchangeProperty("responseId").isNull())
> .throwException(PublicationItemImportException.class, "Unexpected rest "
> + "response (no id returned)")
> .otherwise()
> .end();
> //@formatter:on
> }
> {code}
>
> And this is the CsvAggregationStrategy, we're getting a null pointer
> Exception in the onCompletition method due to a null exchange.
>
> {code:java}
> @Slf4j
> public class CsvAggregationStrategy implements
> CompletionAwareAggregationStrategy {
> @Override
> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
> try {
> if (oldExchange == null) {
> oldExchange = newExchange;
> oldExchange.setProperty("badCsvData", new TreeMap<>()); // TreeMap ensures a
> sorted order
> oldExchange.setProperty("successfulRecords", new TreeMap<>());
> }
> if (newExchange.getProperty(Exchange.SPLIT_COMPLETE, boolean.class)) {
> oldExchange.setProperty("numberOfCSVRows",
> newExchange.getProperty(Exchange.SPLIT_SIZE));
> }
> Exception exception = newExchange.getProperty(Exchange.EXCEPTION_CAUGHT,
> Exception.class);
> if (exception != null) {
> @SuppressWarnings("unchecked")
> Map<Integer, Object> badCsvData = oldExchange.getProperty("badCsvData",
> Map.class);
> @SuppressWarnings("unchecked")
> List<String> csvRowData = newExchange.getProperty("csvRowData", List.class);
> if (exception instanceof HttpOperationFailedException) {
> Map<String, String> csvFieldsByEntityAttribute =
> newExchange.getProperty("csvFieldsByEntityAttribute", Map.class);
> String responseBody = ((HttpOperationFailedException)
> exception).getResponseBody();
> String errorMessage = getErrorMessage(responseBody,
> csvFieldsByEntityAttribute,
> ((HttpOperationFailedException) exception).getStatusCode());
> csvRowData.add(errorMessage);
> } else {
> csvRowData.add(exception.getMessage());
> }
> badCsvData.put(newExchange.getProperty(Exchange.SPLIT_INDEX, Integer.class),
> csvRowData);
> oldExchange.setProperty("badCsvData", badCsvData);
> }else {
> @SuppressWarnings("unchecked")
> Map<Integer, Object> sucessRecords =
> oldExchange.getProperty("successfulRecords", Map.class);
> @SuppressWarnings("unchecked")
> List<String> csvRowData = newExchange.getProperty("csvRowData", List.class);
> Integer idPosition = (Integer) newExchange.getProperty("idColumnPosition");
> if(idPosition != null) {
> csvRowData.set(idPosition, (String)newExchange.getProperty("responseId"));
> }else {
> csvRowData.add((String)newExchange.getProperty("responseId"));
> }
> sucessRecords.put(newExchange.getProperty(Exchange.SPLIT_INDEX,
> Integer.class), csvRowData);
> oldExchange.setProperty("successfulRecords", sucessRecords);
> }
> } catch(Exception e) {
> log.error("Error when trying to aggregate exchanges: " + e.getMessage(), e);
> if(oldExchange != null) {
> oldExchange.setProperty("aggregationError", ExceptionUtils.getStackTrace(e));
> }
> }
> return oldExchange;
> }
> @Override
> public void onCompletion(Exchange exchange) {
> @SuppressWarnings("unchecked")
> Map<Integer, List<String>> badCsvData = exchange.getProperty("badCsvData",
> Map.class);
> exchange.setProperty("badCsvData", new ArrayList<>(badCsvData.values()));
> @SuppressWarnings("unchecked")
> Map<Integer, List<String>> succesfulCsvData =
> exchange.getProperty("successfulRecords", Map.class);
> exchange.setProperty("successfulRecords", new
> ArrayList<>(succesfulCsvData.values()));
> /* Removing Exception/Failure properties if any occurred while processing the
> CSV rows. */
> exchange.removeProperties("CamelFailure*");
> exchange.removeProperties("CamelException*");
> exchange.removeProperties("CamelError*");
> }
> }
> {code}
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)