Repository: incubator-unomi Updated Branches: refs/heads/master bd9b3e1be -> b70cda666
Avoid re-consuming the same file when it fails Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/b70cda66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/b70cda66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/b70cda66 Branch: refs/heads/master Commit: b70cda6669576211fc00dfe1cf350f112ed6ed46 Parents: bd9b3e1 Author: Abdelkader Midani <[email protected]> Authored: Fri Dec 1 11:16:02 2017 +0100 Committer: Abdelkader Midani <[email protected]> Committed: Fri Dec 1 11:16:02 2017 +0100 ---------------------------------------------------------------------- .../unomi/router/core/processor/LineSplitProcessor.java | 6 +++--- .../core/route/ProfileImportFromSourceRouteBuilder.java | 6 +++++- .../router/core/route/ProfileImportOneShotRouteBuilder.java | 7 +++++-- 3 files changed, 13 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b70cda66/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java index 7240f90..b73a303 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java @@ -89,15 +89,15 @@ public class LineSplitProcessor implements Processor { String[] profileData = rfc4180Parser.parseLine(((String) exchange.getIn().getBody())); - logger.debug("$$$$ : LineSplitProcessor : LINE : {}, {}, {}.", profileData[0], profileData[1], profileData[2]); - ProfileToImport profileToImport = new ProfileToImport(); profileToImport.setItemId(UUID.randomUUID().toString()); profileToImport.setItemType("profile"); profileToImport.setScope(RouterConstants.SYSTEM_SCOPE); if (profileData.length > 0 && StringUtils.isNotBlank(profileData[0])) { - if (hasDeleteColumn && (fieldsMapping.size() > (profileData.length - 1))) { + if ((hasDeleteColumn && (fieldsMapping.size() > (profileData.length - 1))) + || (!hasDeleteColumn && (fieldsMapping.size() > (profileData.length))) + ) { throw new BadProfileDataFormatException("The mapping does not match the number of column : line [" + ((Integer) exchange.getProperty("CamelSplitIndex") + 1) + "]", new Throwable("MAPPING_COLUMN_MATCH")); } logger.debug("$$$$ : LineSplitProcessor : MAPPING : " + fieldsMapping.keySet()); http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b70cda66/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java index 90599f9..058cba6 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java @@ -63,7 +63,10 @@ public class ProfileImportFromSourceRouteBuilder extends RouterAbstractRouteBuil ProcessorDefinition prDefErr = onException(BadProfileDataFormatException.class) .log(LoggingLevel.ERROR, "Error processing record ${exchangeProperty.CamelSplitIndex}++ !") .handled(true) - .process(new LineSplitFailureHandler()); + .process(new LineSplitFailureHandler()) + .onException(Exception.class) + .log(LoggingLevel.ERROR, "Failed to process file.") + .handled(true); if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); @@ -90,6 +93,7 @@ public class ProfileImportFromSourceRouteBuilder extends RouterAbstractRouteBuil lineSplitProcessor.setProfileService(profileService); String endpoint = (String) importConfiguration.getProperties().get("source"); + endpoint += "&moveFailed=.error"; if (StringUtils.isNotBlank(endpoint) && allowedEndpoints.contains(endpoint.substring(0, endpoint.indexOf(':')))) { ProcessorDefinition prDef = from(endpoint) http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b70cda66/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java index 731ac4c..99e53e6 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java @@ -50,7 +50,10 @@ public class ProfileImportOneShotRouteBuilder extends RouterAbstractRouteBuilder ProcessorDefinition prDefErr = onException(BadProfileDataFormatException.class) .log(LoggingLevel.ERROR, "Error processing record ${exchangeProperty.CamelSplitIndex}++ !") .handled(true) - .process(new LineSplitFailureHandler()); + .process(new LineSplitFailureHandler()) + .onException(Exception.class) + .log(LoggingLevel.ERROR, "Failed to process file.") + .handled(true); if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); @@ -61,7 +64,7 @@ public class ProfileImportOneShotRouteBuilder extends RouterAbstractRouteBuilder LineSplitProcessor lineSplitProcessor = new LineSplitProcessor(); lineSplitProcessor.setProfileService(profileService); - ProcessorDefinition prDef = from("file://" + uploadDir + "?include=.*.csv&consumer.delay=1m") + ProcessorDefinition prDef = from("file://" + uploadDir + "?moveFailed=.error&include=.*.csv&consumer.delay=1m") .routeId(RouterConstants.IMPORT_ONESHOT_ROUTE_ID) .autoStartup(true) .process(importConfigByFileNameProcessor)
