Repository: incubator-unomi
Updated Branches:
  refs/heads/master bd9b3e1be -> b70cda666


Avoid re-consuming the same file when it fails


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/b70cda66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/b70cda66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/b70cda66

Branch: refs/heads/master
Commit: b70cda6669576211fc00dfe1cf350f112ed6ed46
Parents: bd9b3e1
Author: Abdelkader Midani <[email protected]>
Authored: Fri Dec 1 11:16:02 2017 +0100
Committer: Abdelkader Midani <[email protected]>
Committed: Fri Dec 1 11:16:02 2017 +0100

----------------------------------------------------------------------
 .../unomi/router/core/processor/LineSplitProcessor.java       | 6 +++---
 .../core/route/ProfileImportFromSourceRouteBuilder.java       | 6 +++++-
 .../router/core/route/ProfileImportOneShotRouteBuilder.java   | 7 +++++--
 3 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b70cda66/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
index 7240f90..b73a303 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
@@ -89,15 +89,15 @@ public class LineSplitProcessor implements Processor {
 
         String[] profileData = rfc4180Parser.parseLine(((String) 
exchange.getIn().getBody()));
 
-        logger.debug("$$$$ : LineSplitProcessor : LINE : {}, {}, {}.", 
profileData[0], profileData[1], profileData[2]);
-
         ProfileToImport profileToImport = new ProfileToImport();
         profileToImport.setItemId(UUID.randomUUID().toString());
         profileToImport.setItemType("profile");
         profileToImport.setScope(RouterConstants.SYSTEM_SCOPE);
 
         if (profileData.length > 0 && StringUtils.isNotBlank(profileData[0])) {
-            if (hasDeleteColumn && (fieldsMapping.size() > (profileData.length 
- 1))) {
+            if ((hasDeleteColumn && (fieldsMapping.size() > 
(profileData.length - 1)))
+                    || (!hasDeleteColumn && (fieldsMapping.size() > 
(profileData.length)))
+                    ) {
                 throw new BadProfileDataFormatException("The mapping does not 
match the number of column : line [" + ((Integer) 
exchange.getProperty("CamelSplitIndex") + 1) + "]", new 
Throwable("MAPPING_COLUMN_MATCH"));
             }
             logger.debug("$$$$ : LineSplitProcessor : MAPPING : " + 
fieldsMapping.keySet());

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b70cda66/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
index 90599f9..058cba6 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
@@ -63,7 +63,10 @@ public class ProfileImportFromSourceRouteBuilder extends 
RouterAbstractRouteBuil
         ProcessorDefinition prDefErr = 
onException(BadProfileDataFormatException.class)
                 .log(LoggingLevel.ERROR, "Error processing record 
${exchangeProperty.CamelSplitIndex}++ !")
                 .handled(true)
-                .process(new LineSplitFailureHandler());
+                .process(new LineSplitFailureHandler())
+                .onException(Exception.class)
+                .log(LoggingLevel.ERROR, "Failed to process file.")
+                .handled(true);
 
         if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
             prDefErr.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
@@ -90,6 +93,7 @@ public class ProfileImportFromSourceRouteBuilder extends 
RouterAbstractRouteBuil
                 lineSplitProcessor.setProfileService(profileService);
 
                 String endpoint = (String) 
importConfiguration.getProperties().get("source");
+                endpoint += "&moveFailed=.error";
 
                 if (StringUtils.isNotBlank(endpoint) && 
allowedEndpoints.contains(endpoint.substring(0, endpoint.indexOf(':')))) {
                     ProcessorDefinition prDef = from(endpoint)

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b70cda66/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
index 731ac4c..99e53e6 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
@@ -50,7 +50,10 @@ public class ProfileImportOneShotRouteBuilder extends 
RouterAbstractRouteBuilder
         ProcessorDefinition prDefErr = 
onException(BadProfileDataFormatException.class)
                 .log(LoggingLevel.ERROR, "Error processing record 
${exchangeProperty.CamelSplitIndex}++ !")
                 .handled(true)
-                .process(new LineSplitFailureHandler());
+                .process(new LineSplitFailureHandler())
+                .onException(Exception.class)
+                .log(LoggingLevel.ERROR, "Failed to process file.")
+                .handled(true);
 
         if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
             prDefErr.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
@@ -61,7 +64,7 @@ public class ProfileImportOneShotRouteBuilder extends 
RouterAbstractRouteBuilder
         LineSplitProcessor lineSplitProcessor = new LineSplitProcessor();
         lineSplitProcessor.setProfileService(profileService);
 
-        ProcessorDefinition prDef = from("file://" + uploadDir + 
"?include=.*.csv&consumer.delay=1m")
+        ProcessorDefinition prDef = from("file://" + uploadDir + 
"?moveFailed=.error&include=.*.csv&consumer.delay=1m")
                 .routeId(RouterConstants.IMPORT_ONESHOT_ROUTE_ID)
                 .autoStartup(true)
                 .process(importConfigByFileNameProcessor)

Reply via email to