UNOMI-101 : New feature to add the ability to import profiles

Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/89d4c8eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/89d4c8eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/89d4c8eb

Branch: refs/heads/master
Commit: 89d4c8eb9646705cfd6c4bdb79fdac27ee9a5634
Parents: 0f44140
Author: Abdelkader Midani <amid...@apache.org>
Authored: Tue May 23 02:30:28 2017 +0200
Committer: Abdelkader Midani <amid...@apache.org>
Committed: Mon Jun 12 20:09:59 2017 +0200

----------------------------------------------------------------------
 extensions/pom.xml                              |   1 +
 extensions/router/README.md                     | 113 ++++++++++
 extensions/router/pom.xml                       |  64 ++++++
 extensions/router/router-api/pom.xml            |  43 ++++
 .../unomi/router/api/ImportConfiguration.java   | 223 +++++++++++++++++++
 .../unomi/router/api/ProfileToImport.java       |  77 +++++++
 .../services/ImportConfigurationService.java    |  60 +++++
 .../api/services/ProfileImportService.java      |  29 +++
 extensions/router/router-core/pom.xml           | 182 +++++++++++++++
 .../core/context/ProfileImportCamelContext.java | 170 ++++++++++++++
 .../core/processor/ConfigUpdateProcessor.java   |  44 ++++
 .../ImportConfigByFileNameProcessor.java        |  44 ++++
 .../core/processor/LineSplitProcessor.java      | 114 ++++++++++
 .../core/processor/UnomiStorageProcessor.java   |  46 ++++
 .../ProfileImportConfigUpdateRouteBuilder.java  |  62 ++++++
 .../ProfileImportKafkaToUnomiRouteBuilder.java  |  77 +++++++
 .../route/ProfileImportOneShotRouteBuilder.java |  99 ++++++++
 .../ProfileImportSourceToKafkaRouteBuilder.java | 120 ++++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml  |  94 ++++++++
 .../main/resources/org.apache.unomi.router.cfg  |  25 +++
 extensions/router/router-karaf-feature/pom.xml  | 157 +++++++++++++
 extensions/router/router-rest/pom.xml           |  75 +++++++
 .../ImportConfigurationServiceEndPoint.java     | 176 +++++++++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml  |  80 +++++++
 extensions/router/router-service/pom.xml        | 104 +++++++++
 .../ImportConfigurationServiceImpl.java         | 114 ++++++++++
 .../services/ProfileImportServiceImpl.java      | 121 ++++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml  |  39 ++++
 itests/pom.xml                                  |   8 +-
 package/pom.xml                                 |  11 +
 performance-tests/pom.xml                       |   4 +-
 ...g.apache.unomi.persistence.elasticsearch.cfg |   2 +-
 persistence-elasticsearch/plugins/pom.xml       |   2 +-
 samples/trainingplugin.zip                      | Bin 0 -> 48508 bytes
 samples/trainingplugin/pom.xml                  |  55 +++++
 .../training/TrainedNotificationAction.java     |  61 +++++
 .../cxs/actions/trainingNotifAction.json        |  13 ++
 .../META-INF/cxs/rules/trainedNotification.json |  20 ++
 .../resources/OSGI-INF/blueprint/blueprint.xml  |  34 +++
 39 files changed, 2755 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/pom.xml b/extensions/pom.xml
index 7126b61..dea4bc2 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -34,6 +34,7 @@
         <module>lists-extension</module>
         <module>privacy-extension</module>
         <module>geonames</module>
+        <module>router</module>
     </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/README.md
----------------------------------------------------------------------
diff --git a/extensions/router/README.md b/extensions/router/README.md
new file mode 100644
index 0000000..0d382d4
--- /dev/null
+++ b/extensions/router/README.md
@@ -0,0 +1,113 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+Unomi Router
+==========================
+
+## Getting started
+Unomi Router Extension a Karaf Feature that provide an Enterprise Application 
Integration tool.
+It is optional so you must configure it and install it in Karaf, and can be 
used for Machine - Machine or Human - Machine integration with Unomi.
+Mainly Unomi Router Extension aim to make it easy to import third party 
applications/platforms profiles into Unomi.
+This extension is implemented using Apache Camel routes and is using Apache 
Kafka to buffer import process and make it failsafe. 
+
+## Getting started
+1. Configure your Unomi Router:
+    In the `etc/org.apache.unomi.router.cfg` file, you might want to update 
the following settings:
+    Kafka settings 
+    >`#Kafka settings`
+    
+    >`kafka.host=localhost`
+    
+    >`kafka.port=9092`
+    
+    >`kafka.import.topic=camel-deposit`
+    
+    >`kafka.import.groupId=unomi-import-group`
+    
+    Kafka host and port with the topic name and the groupId ti which the topic 
is assigned
+    
+    >`#Import One Shot upload directory`
+    
+    >`import.oneshot.uploadDir=/tmp/unomi_oneshot_import_configs/`
+   
+    Path to the folder where unomi should stock file imported for a oneshot 
processing
+    
+
+2. Deploy into Apache Unomi using the following commands from the Apache Karaf 
shell:
+    ```sh
+    $ feature:repo-add 
mvn:org.apache.unomi/unomi-router-karaf-feature/${version}/xml/features
+    $ feature:install unomi-router-karaf-feature
+    ```
+    
+3. Send your import configuration:
+
+    An import configuration is nothing else than a simple JSON to describe how 
you want to import your data (Profiles).
+    To create/update an import configuration
+    
+    `POST /cxs/importConfiguration`
+    ```json
+     {
+         "itemId": "f57f1f86-97bf-4ba0-b4e4-7d5e77e7c0bd",
+         "itemType": "importConfig",
+         "scope": "integration",
+         "name": "Test Recurrent",
+         "description": "Just test recurrent import",
+         "configType": "recurrent",
+         "properties": {
+           "source": 
"{file/ftp}://{path}?fileName={file-name}.csv&move=.done&consumer.delay=20000",
+           "mapping": {
+             "firstName": 0,
+             "lastName": 1,
+             ...
+           }
+         },
+         "mergingProperty": "email",
+         "overwriteExistingProfiles": true,
+         "propertiesToOverwrite": ["firstName", "lastName"],
+         "active": true
+     }
+    ```
+    
+    Omit the `itemId` when creating new entry, `configType` can be 
'**recurrent**' for file/ftp/network path polling or  '**oneshot**' for one 
time import.
+    
+    The `properties.source` attribute is an Apache Camel endpoint uri (See 
http://camel.apache.org/uris.html for more details). Unomi Router is designed 
to use **File** and **FTP** Camel components. 
+    
+    The attribute `properties.mapping` is a Map of:
+    * Key: Profile property id in Unomi
+    * Value: Index of the column in the imported file to copy the in the 
previous property.
+        
+    The attribute `mergingProperty` is the profile property id in Unomi to use 
to check for duplication.
+    
+    The attribute `propertiesToOverwrite` is a list of profile properties ids 
to overwrite, if **null** all properties
+    will be overwritten.
+    
+    The attribute `active` is the flag to activate or deactivate the import 
configuration.
+    
+    Concerning oneshot import configuration using the previously described 
service will only create the import configuration, to send the file to process
+    you need to call : 
+    
+    `POST /cxs/importConfiguration/oneshot`
+    
+    `Content-Type : multipart/form-data`
+    
+    First multipart with the name '**importConfigId**' is the 
importConfiguration to use to import the file, second one with the name 
'**file**' is the file to import.
+    
+    
+   
+
+    
+    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/router/pom.xml b/extensions/router/pom.xml
new file mode 100644
index 0000000..c69320c
--- /dev/null
+++ b/extensions/router/pom.xml
@@ -0,0 +1,64 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.unomi</groupId>
+        <artifactId>unomi-extensions</artifactId>
+        <version>1.2.0-incubating-dmf_1343-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>unomi-router</artifactId>
+    <name>Apache Unomi :: Extensions :: Router</name>
+    <description>Apache Camel Router for the Apache Unomi Context 
server</description>
+    <packaging>pom</packaging>
+
+    <properties>
+        <camel.version>2.18.3</camel.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        
<Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>
+                        <Import-Package>
+                            sun.misc;resolution:=optional,
+                            *
+                        </Import-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <modules>
+        <module>router-api</module>
+        <module>router-service</module>
+        <module>router-core</module>
+        <module>router-rest</module>
+        <module>router-karaf-feature</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-api/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/router/router-api/pom.xml 
b/extensions/router/router-api/pom.xml
new file mode 100644
index 0000000..06207b3
--- /dev/null
+++ b/extensions/router/router-api/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>unomi-router</artifactId>
+        <groupId>org.apache.unomi</groupId>
+        <version>1.2.0-incubating-dmf_1343-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>unomi-router-api</artifactId>
+    <name>Apache Unomi :: Extensions :: Router :: API</name>
+    <description>Router Specification API</description>
+    <packaging>bundle</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.unomi</groupId>
+            <artifactId>unomi-api</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java
 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java
new file mode 100644
index 0000000..127de39
--- /dev/null
+++ 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.api;
+
+import org.apache.unomi.api.Item;
+import org.apache.unomi.api.MetadataItem;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by amidani on 28/04/2017.
+ */
+public class ImportConfiguration extends Item {
+
+    /**
+     * The ImportConfiguration ITEM_TYPE
+     *
+     * @see Item for a discussion of ITEM_TYPE
+     */
+    public static final String ITEM_TYPE = "importConfig";
+    private String name;
+    private String description;
+    private String configType;
+    private Map<String, Object> properties = new HashMap<>();
+    private String mergingProperty;
+    private boolean overwriteExistingProfiles = false;
+    private List<String> propertiesToOverwrite;
+
+    private String columnSeparator = ",";
+    private String lineSeparator = "\n";
+    private boolean active = false;
+
+    /**
+     * Sets the property identified by the specified name to the specified 
value. If a property with that name already exists, replaces its value, 
otherwise adds the new
+     * property with the specified name and value.
+     *
+     * @param name  the name of the property to set
+     * @param value the value of the property
+     */
+    public void setProperty(String name, Object value) {
+        properties.put(name, value);
+    }
+
+    /**
+     * Retrieves the name of the import configuration
+     * @return the name of the import configuration
+     */
+    public String getName() { return this.name; }
+
+    /**
+     * Sets the name of the import configuration
+     * @param name the name of the import configuration
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Retrieves the description of the import configuration
+     * @return the description of the import configuration
+     */
+    public String getDescription() { return this.description; }
+
+    /**
+     * Sets the description of the import configuration
+     * @param description the description of the import configuration
+     */
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+
+    /**
+     * Retrieves the config type of the import configuration
+     * @return the config type of the import configuration
+     */
+    public String getConfigType() { return this.configType; }
+
+    /**
+     * Sets the config type of the import configuration
+     * @param configType the config type of the import configuration
+     */
+    public void setConfigType(String configType) {
+        this.configType = configType;
+    }
+
+    /**
+     * Retrieves the property identified by the specified name.
+     *
+     * @param name the name of the property to retrieve
+     * @return the value of the specified property or {@code null} if no such 
property exists
+     */
+    public Object getProperty(String name) {
+        return properties.get(name);
+    }
+
+    /**
+     * Retrieves a Map of all property name - value pairs for this import 
configuration.
+     *
+     * @return a Map of all property name - value pairs for this import 
configuration
+     */
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    /**
+     * Sets the property name - value pairs for this import configuration.
+     *
+     * @param properties a Map containing the property name - value pairs for 
this import configuration
+     */
+    public void setProperties(Map<String, Object> properties) {
+        this.properties = properties;
+    }
+
+    public String getMergingProperty() {
+        return mergingProperty;
+    }
+
+    /**
+     * Sets the merging property.
+     * @param mergingProperty property used to check if the profile exist when 
merging
+     */
+    public void setMergingProperty(String mergingProperty) {
+        this.mergingProperty = mergingProperty;
+    }
+
+
+    /**
+     * Retrieves the import configuration active flag.
+     *
+     * @return true if the import configuration is active false if not
+     */
+    public boolean isActive() {
+        return this.active;
+    }
+
+    /**
+     * Sets the active flag true/false.
+     *
+     * @param active a boolean to set to active or inactive the import 
configuration
+     */
+    public void setActive(boolean active) {
+        this.active = active;
+    }
+
+    /**
+     * Retrieves the import configuration overwriteExistingProfiles flag.
+     *
+     * @return true if during the import existing profiles must be overwritten
+     */
+    public boolean isOverwriteExistingProfiles() {
+        return this.overwriteExistingProfiles;
+    }
+
+    /**
+     * Sets the overwriteExistingProfiles flag true/false.
+     *
+     * @param overwriteExistingProfiles a boolean to set 
overwriteExistingProfiles in the import configuration
+     */
+    public void setOverwriteExistingProfiles(boolean 
overwriteExistingProfiles) {
+        this.overwriteExistingProfiles = overwriteExistingProfiles;
+    }
+
+    public List<String> getPropertiesToOverwrite() {
+        return propertiesToOverwrite;
+    }
+
+    public void setPropertiesToOverwrite(List<String> propertiesToOverwrite) {
+        this.propertiesToOverwrite = propertiesToOverwrite;
+    }
+
+    /**
+     * gets the column separator.
+     */
+    public String getColumnSeparator() {
+        return this.columnSeparator;
+    }
+
+    /**
+     * Sets the column separator.
+     * @param columnSeparator property used to specify a line separator. 
Defaults to ','
+     */
+    public void setColumnSeparator(String columnSeparator) {
+        if(this.columnSeparator !=null) {
+            this.columnSeparator = columnSeparator;
+        }
+    }
+
+    /**
+     * gets the line separator.
+     */
+    public String getLineSeparator() {
+        return this.lineSeparator;
+    }
+
+    /**
+     * Sets the line separator.
+     * @param lineSeparator property used to specify a line separator. 
Defaults to '\n'
+     */
+    public void setLineSeparator(String lineSeparator) {
+        if(lineSeparator != null) {
+            this.lineSeparator = lineSeparator;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java
 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java
new file mode 100644
index 0000000..30e40e0
--- /dev/null
+++ 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.router.api;
+
+import org.apache.unomi.api.Profile;
+
+import java.util.List;
+
+/**
+ * An extension of {@link Profile} to handle merge strategy and deletion when 
importing profiles
+ */
+public class ProfileToImport extends Profile {
+
+    private List<String> propertiesToOverwrite;
+    private String mergingProperty;
+    private boolean profileToDelete;
+    private boolean overwriteExistingProfiles;
+
+
+    public List<String> getPropertiesToOverwrite() {
+        return this.propertiesToOverwrite;
+    }
+
+    public void setPropertiesToOverwrite(List<String> propertiesToOverwrite) {
+        this.propertiesToOverwrite = propertiesToOverwrite;
+    }
+
+    public boolean isProfileToDelete() {
+        return this.profileToDelete;
+    }
+
+    public void setProfileToDelete(boolean profileToDelete) {
+        this.profileToDelete = profileToDelete;
+    }
+
+    public boolean isOverwriteExistingProfiles() {
+        return this.overwriteExistingProfiles;
+    }
+
+    /**
+     * Sets the overwriteExistingProfiles flag.
+     * @param overwriteExistingProfiles flag used to specify if we want to 
overwrite existing profiles
+     */
+    public void setOverwriteExistingProfiles(boolean 
overwriteExistingProfiles) {
+        this.overwriteExistingProfiles = overwriteExistingProfiles;
+    }
+
+    public String getMergingProperty() {
+        return this.mergingProperty;
+    }
+
+    /**
+     * Sets the merging property.
+     * @param mergingProperty property used to check if the profile exist when 
merging
+     */
+    public void setMergingProperty(String mergingProperty) {
+        this.mergingProperty = mergingProperty;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java
 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java
new file mode 100644
index 0000000..cacd671
--- /dev/null
+++ 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.api.services;
+
+import org.apache.unomi.router.api.ImportConfiguration;
+
+import java.util.List;
+
+/**
+ * A service to access and operate on {@link ImportConfiguration}s.
+ */
+public interface ImportConfigurationService {
+
+    /**
+     * Retrieves all the import configurations.
+     *
+     * @return the list of import configurations
+     */
+    public List<ImportConfiguration> getImportConfigurations();
+
+    /**
+     * Retrieves the import configuration identified by the specified 
identifier.
+     *
+     * @param configId the identifier of the profile to retrieve
+     * @return the import configuration identified by the specified identifier 
or
+     *  {@code null} if no such import configuration exists
+     */
+    public ImportConfiguration load(String configId);
+
+    /**
+     * Saves the specified import configuration in the context server.
+     *
+     * @param profile the import configuration to be saved
+     * @return the newly saved import configuration
+     */
+    public ImportConfiguration save(ImportConfiguration profile);
+
+    /**
+     * Deletes the import configuration identified by the specified identifier.
+     *
+     * @param configId the identifier of the import configuration to delete
+     */
+    public void delete(String configId);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java
 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java
new file mode 100644
index 0000000..aa7d182
--- /dev/null
+++ 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.api.services;
+
+import org.apache.unomi.router.api.ProfileToImport;
+
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Created by amidani on 20/05/2017.
+ */
+public interface ProfileImportService {
+
+    boolean saveMergeDeleteImportedProfile(ProfileToImport profileToImport) 
throws InvocationTargetException, IllegalAccessException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/router/router-core/pom.xml 
b/extensions/router/router-core/pom.xml
new file mode 100644
index 0000000..53780e7
--- /dev/null
+++ b/extensions/router/router-core/pom.xml
@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>unomi-router</artifactId>
+        <groupId>org.apache.unomi</groupId>
+        <version>1.2.0-incubating-dmf_1343-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>unomi-router-core</artifactId>
+    <name>Apache Unomi :: Extensions :: Router :: Core</name>
+    <description>Router Core (Apache Camel Routes)</description>
+    <packaging>bundle</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.unomi</groupId>
+            <artifactId>unomi-api</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.unomi</groupId>
+            <artifactId>unomi-services</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.unomi</groupId>
+            <artifactId>unomi-router-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+            <version>${camel.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-jackson</artifactId>
+            <version>${camel.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-http-common</artifactId>
+            <version>${camel.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-servlet</artifactId>
+            <version>${camel.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-kafka</artifactId>
+            <version>${camel.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+            <version>3.5</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.servicemix.bundles</groupId>
+            <artifactId>org.apache.servicemix.bundles.jsch</artifactId>
+            <version>0.1.54_1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.1.0</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        
<Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>
+                        <Import-Package>
+                            org.osgi.service.event;resolution:=optional,
+                            org.apache.camel,
+                            org.apache.camel.builder,
+                            org.apache.camel.component.file.remote,
+                            org.apache.camel.component.file,
+                            org.apache.camel.component.jackson,
+                            org.apache.camel.component.kafka,
+                            org.apache.camel.component.servlet,
+                            org.apache.camel.component.servlet.osgi,
+                            org.apache.camel.impl,
+                            org.apache.camel.model,
+                            org.apache.camel.model.dataformat,
+                            org.apache.camel.model.rest,
+                            org.apache.camel.spi,
+                            org.apache.unomi.api,
+                            org.apache.unomi.router.api,
+                            org.apache.unomi.api.services,
+                            org.apache.unomi.router.api.services,
+                            
org.apache.kafka.clients.producer;resolution:=optional,
+                            
org.apache.kafka.clients.consumer;resolution:=optional,
+                            com.jcraft.jsch,
+                            org.osgi.framework,
+                            org.osgi.service.http,
+                            org.slf4j
+                        </Import-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-artifacts</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>
+                                        
src/main/resources/org.apache.unomi.router.cfg
+                                    </file>
+                                    <type>cfg</type>
+                                    <classifier>routercfg</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
new file mode 100644
index 0000000..df734d3
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.context;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Route;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.api.services.ImportConfigurationService;
+import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
+import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
+import 
org.apache.unomi.router.core.route.ProfileImportKafkaToUnomiRouteBuilder;
+import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder;
+import 
org.apache.unomi.router.core.route.ProfileImportSourceToKafkaRouteBuilder;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleEvent;
+import org.osgi.framework.SynchronousBundleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by amidani on 04/05/2017.
+ */
+public class ProfileImportCamelContext implements SynchronousBundleListener {
+
+    private Logger logger = 
LoggerFactory.getLogger(ProfileImportCamelContext.class.getName());
+
+    private CamelContext camelContext;
+    private UnomiStorageProcessor unomiStorageProcessor;
+    private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
+    private ImportConfigurationService importConfigurationService;
+    private JacksonDataFormat jacksonDataFormat;
+    private String uploadDir;
+    private Map<String, String> kafkaProps;
+
+    private final String IMPORT_CONFIG_TYPE_RECURRENT = "recurrent";
+
+    private BundleContext bundleContext;
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
+    }
+
+    public void initCamelContext() throws Exception {
+        logger.info("Initialize Camel Context...");
+        camelContext = new DefaultCamelContext();
+        List<ImportConfiguration> importConfigurationList = 
importConfigurationService.getImportConfigurations();
+        ProfileImportSourceToKafkaRouteBuilder builderReader = new 
ProfileImportSourceToKafkaRouteBuilder(kafkaProps);
+        builderReader.setImportConfigurationList(importConfigurationList);
+        builderReader.setJacksonDataFormat(jacksonDataFormat);
+        builderReader.setContext(camelContext);
+        camelContext.addRoutes(builderReader);
+
+        //One shot import route
+        ProfileImportOneShotRouteBuilder builderOneShot = new 
ProfileImportOneShotRouteBuilder(kafkaProps);
+        
builderOneShot.setImportConfigByFileNameProcessor(importConfigByFileNameProcessor);
+        builderOneShot.setJacksonDataFormat(jacksonDataFormat);
+        builderOneShot.setUploadDir(uploadDir);
+        builderOneShot.setContext(camelContext);
+        camelContext.addRoutes(builderOneShot);
+
+
+        ProfileImportKafkaToUnomiRouteBuilder builderProcessor = new 
ProfileImportKafkaToUnomiRouteBuilder(kafkaProps);
+        builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor);
+        builderProcessor.setJacksonDataFormat(jacksonDataFormat);
+        builderProcessor.setContext(camelContext);
+        camelContext.addRoutes(builderProcessor);
+
+        camelContext.start();
+
+        logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
+
+        processBundleStartup(bundleContext);
+        for (Bundle bundle : bundleContext.getBundles()) {
+            if (bundle.getBundleContext() != null) {
+                processBundleStartup(bundle.getBundleContext());
+            }
+        }
+        bundleContext.addBundleListener(this);
+        logger.info("Camel Context {} initialized successfully.");
+
+    }
+
+    private boolean stopRoute(String routeId) throws Exception {
+        return camelContext.stopRoute(routeId, 10L, TimeUnit.SECONDS, true);
+    }
+
+    public void updateProfileImportReaderRoute(ImportConfiguration 
importConfiguration) throws Exception {
+        Route route = camelContext.getRoute(importConfiguration.getItemId());
+        if(route!=null && stopRoute(importConfiguration.getItemId())) {
+            camelContext.removeRoute(importConfiguration.getItemId());
+        }
+        //Handle transforming an import config oneshot <--> recurrent
+        
if(IMPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())){
+            ProfileImportSourceToKafkaRouteBuilder builder = new 
ProfileImportSourceToKafkaRouteBuilder(kafkaProps);
+            
builder.setImportConfigurationList(Arrays.asList(importConfiguration));
+            builder.setJacksonDataFormat(jacksonDataFormat);
+            builder.setContext(camelContext);
+            camelContext.addRoutes(builder);
+        }
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setUnomiStorageProcessor(UnomiStorageProcessor 
unomiStorageProcessor) {
+        this.unomiStorageProcessor = unomiStorageProcessor;
+    }
+
+    public void 
setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor 
importConfigByFileNameProcessor) {
+        this.importConfigByFileNameProcessor = importConfigByFileNameProcessor;
+    }
+
+    public void setImportConfigurationService(ImportConfigurationService 
importConfigurationService) {
+        this.importConfigurationService = importConfigurationService;
+    }
+
+    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
+        this.jacksonDataFormat = jacksonDataFormat;
+    }
+
+    public void setUploadDir(String uploadDir) {
+        this.uploadDir = uploadDir;
+    }
+
+    public void setKafkaProps(Map<String, String> kafkaProps) {
+        this.kafkaProps = kafkaProps;
+    }
+
+    public void preDestroy() throws Exception {
+        bundleContext.removeBundleListener(this);
+        //This is to shutdown Camel context
+        //(will stop all routes/components/endpoints etc and clear internal 
state/cache)
+        this.camelContext.stop();
+        logger.info("Camel context for profile import is shutdown.");
+    }
+
+    private void processBundleStartup(BundleContext bundleContext) {
+        if (bundleContext == null) {
+            return;
+        }
+    }
+
+    @Override
+    public void bundleChanged(BundleEvent bundleEvent) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
new file mode 100644
index 0000000..e4eaa19
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.core.context.ProfileImportCamelContext;
+
+/**
+ * Created by amidani on 10/05/2017.
+ */
+public class ConfigUpdateProcessor implements Processor{
+
+    private ProfileImportCamelContext profileImportCamelContext;
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        if (exchange.getIn() != null) {
+            Message message = exchange.getIn();
+            ImportConfiguration importConfiguration = 
message.getBody(ImportConfiguration.class);
+            
profileImportCamelContext.updateProfileImportReaderRoute(importConfiguration);
+        }
+    }
+
+    public void setProfileImportCamelContext(ProfileImportCamelContext 
profileImportCamelContext) {
+        this.profileImportCamelContext = profileImportCamelContext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
new file mode 100644
index 0000000..7fc7730
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.api.services.ImportConfigurationService;
+
+/**
+ * Created by amidani on 22/05/2017.
+ */
+public class ImportConfigByFileNameProcessor implements Processor{
+
+    private ImportConfigurationService importConfigurationService;
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+
+        String fileName = 
exchange.getIn().getBody(GenericFile.class).getFileName();
+        String importConfigId = fileName.substring(0, fileName.indexOf('.'));
+        ImportConfiguration importConfiguration = 
importConfigurationService.load(importConfigId);
+        exchange.getIn().setHeader("importConfigOneShot", importConfiguration);
+    }
+
+    public void setImportConfigurationService(ImportConfigurationService 
importConfigurationService) {
+        this.importConfigurationService = importConfigurationService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
new file mode 100644
index 0000000..150ef6d
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.api.ProfileToImport;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Created by amidani on 29/12/2016.
+ */
+public class LineSplitProcessor implements Processor {
+
+    private Map<String, Integer> fieldsMapping;
+    private List<String> propertiesToOverwrite;
+    private String mergingProperty;
+    private boolean overwriteExistingProfiles;
+    private String columnSeparator;
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        //In case of one shot import we check the header and overwrite import 
config
+        ImportConfiguration importConfigOneShot = (ImportConfiguration) 
exchange.getIn().getHeader("importConfigOneShot");
+        if(importConfigOneShot!=null) {
+            fieldsMapping = (Map<String, 
Integer>)importConfigOneShot.getProperties().get("mapping");
+            propertiesToOverwrite = 
importConfigOneShot.getPropertiesToOverwrite();
+            mergingProperty = importConfigOneShot.getMergingProperty();
+            overwriteExistingProfiles = 
importConfigOneShot.isOverwriteExistingProfiles();
+            columnSeparator = importConfigOneShot.getColumnSeparator();
+        }
+        String[] profileData = 
((String)exchange.getIn().getBody()).split(columnSeparator);
+        ProfileToImport profileToImport = new ProfileToImport();
+        profileToImport.setItemId(UUID.randomUUID().toString());
+        profileToImport.setItemType("profile");
+        profileToImport.setScope("system");
+        if(profileData.length > 0) {
+            Map<String, Object> properties = new HashMap<>();
+            for(String fieldMappingKey : fieldsMapping.keySet()) {
+                if(profileData.length > fieldsMapping.get(fieldMappingKey)) {
+                    properties.put(fieldMappingKey, 
profileData[fieldsMapping.get(fieldMappingKey)].trim());
+                }
+            }
+            profileToImport.setProperties(properties);
+            profileToImport.setMergingProperty(mergingProperty);
+            profileToImport.setPropertiesToOverwrite(propertiesToOverwrite);
+            
profileToImport.setOverwriteExistingProfiles(overwriteExistingProfiles);
+            if(StringUtils.isNotBlank(profileData[profileData.length - 1]) && 
Boolean.parseBoolean(profileData[profileData.length - 1].trim())) {
+                profileToImport.setProfileToDelete(true);
+            }
+        }
+        exchange.getIn().setBody(profileToImport, ProfileToImport.class);
+        exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
+        exchange.getIn().setHeader(KafkaConstants.KEY, "1");
+    }
+
+    /**
+     * Setter of fieldsMapping
+     * @param fieldsMapping map String,Integer fieldName in ES and the 
matching column index in the import file
+     */
+    public void setFieldsMapping(Map<String, Integer> fieldsMapping) {
+        this.fieldsMapping = fieldsMapping;
+    }
+
+    public void setPropertiesToOverwrite(List<String> propertiesToOverwrite) {
+        this.propertiesToOverwrite = propertiesToOverwrite;
+    }
+
+    public void setOverwriteExistingProfiles(boolean 
overwriteExistingProfiles) {
+        this.overwriteExistingProfiles = overwriteExistingProfiles;
+    }
+
+    public String getMergingProperty() {
+        return this.mergingProperty;
+    }
+
+    /**
+     * Sets the merging property.
+     * @param mergingProperty property used to check if the profile exist when 
merging
+     */
+    public void setMergingProperty(String mergingProperty) {
+        this.mergingProperty = mergingProperty;
+    }
+
+    /**
+     * Sets the line separator.
+     * @param columnSeparator property used to specify a line separator. 
Defaults to ','
+     */
+    public void setColumnSeparator(String columnSeparator) {
+        this.columnSeparator = columnSeparator;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java
new file mode 100644
index 0000000..7e55185
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.unomi.router.api.ProfileToImport;
+import org.apache.unomi.router.api.services.ProfileImportService;
+
+/**
+ * Created by amidani on 29/12/2016.
+ */
+public class UnomiStorageProcessor implements Processor {
+
+    private ProfileImportService profileImportService;
+
+    @Override
+    public void process(Exchange exchange)
+            throws Exception {
+        if (exchange.getIn() != null) {
+            Message message = exchange.getIn();
+
+            ProfileToImport profileToImport = (ProfileToImport) 
message.getBody();
+            
profileImportService.saveMergeDeleteImportedProfile(profileToImport);
+        }
+    }
+
+    public void setProfileImportService(ProfileImportService 
profileImportService) {
+        this.profileImportService = profileImportService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java
new file mode 100644
index 0000000..40575d5
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.rest.RestBindingMode;
+import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.core.context.ProfileImportCamelContext;
+import org.apache.unomi.router.core.processor.ConfigUpdateProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by amidani on 10/05/2017.
+ */
+public class ProfileImportConfigUpdateRouteBuilder extends RouteBuilder {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ProfileImportConfigUpdateRouteBuilder.class.getName());
+
+    private ProfileImportCamelContext profileImportCamelContext;
+
+    @Override
+    public void configure() throws Exception {
+        logger.info("Preparing REST Configuration for servlet with context 
path [/importConfigAdmin]");
+        restConfiguration().component("servlet")
+                .contextPath("/importConfigAdmin")
+                .enableCORS(false)
+                .bindingMode(RestBindingMode.json)
+                .dataFormatProperty("prettyPrint", "true");
+
+        
rest().put("/").consumes("application/json").type(ImportConfiguration.class)
+                .to("direct:importConfigRestDeposit");
+        ConfigUpdateProcessor profileImportConfigUpdateProcessor = new 
ConfigUpdateProcessor();
+        
profileImportConfigUpdateProcessor.setProfileImportCamelContext(profileImportCamelContext);
+        from("direct:importConfigRestDeposit")
+                .process(profileImportConfigUpdateProcessor)
+                .transform().constant("Success.")
+                .onException(Exception.class)
+                .transform().constant("Failure!");
+
+
+    }
+
+    public void setProfileImportCamelContext(ProfileImportCamelContext 
profileImportCamelContext) {
+        this.profileImportCamelContext = profileImportCamelContext;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java
new file mode 100644
index 0000000..1b056fe
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 26/04/2017.
+ */
+public class ProfileImportKafkaToUnomiRouteBuilder extends RouteBuilder {
+
+    private UnomiStorageProcessor unomiStorageProcessor;
+    private JacksonDataFormat jacksonDataFormat;
+    private String kafkaHost;
+    private String kafkaPort;
+    private String kafkaImportTopic;
+    private String kafkaImportGroupId;
+
+    public ProfileImportKafkaToUnomiRouteBuilder(Map<String, String> 
kafkaProps) {
+        kafkaHost = kafkaProps.get("kafkaHost");
+        kafkaPort = kafkaProps.get("kafkaPort");
+        kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
+        kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
+    }
+
+    @Override
+    public void configure() throws Exception {
+
+        StringBuilder kafkaUri = new StringBuilder("kafka:");
+        
kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic);
+        if(StringUtils.isNotBlank(kafkaImportGroupId)) {
+            kafkaUri.append("&groupId="+kafkaImportGroupId);
+        }
+        kafkaUri.append("&autoCommitEnable=true&consumersCount=10");
+        KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
+        kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort);
+        kafkaConfiguration.setTopic(kafkaImportTopic);
+        kafkaConfiguration.setGroupId(kafkaImportGroupId);
+        KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new 
KafkaComponent(this.getContext()));
+        endpoint.setConfiguration(kafkaConfiguration);
+        from(endpoint)
+                .unmarshal(jacksonDataFormat)
+                .process(unomiStorageProcessor)
+                .to("log:org.apache.unomi.router?level=INFO");
+
+    }
+
+    public void setUnomiStorageProcessor(UnomiStorageProcessor 
unomiStorageProcessor) {
+        this.unomiStorageProcessor = unomiStorageProcessor;
+    }
+
+    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
+        this.jacksonDataFormat = jacksonDataFormat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
new file mode 100644
index 0000000..d095f3e
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
+import org.apache.unomi.router.core.processor.LineSplitProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 22/05/2017.
+ */
+public class ProfileImportOneShotRouteBuilder extends RouteBuilder {
+
+    private Logger logger = 
LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName());
+
+    private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
+    private JacksonDataFormat jacksonDataFormat;
+    private String uploadDir;
+    private String kafkaHost;
+    private String kafkaPort;
+    private String kafkaImportTopic;
+    private String kafkaImportGroupId;
+
+    private final String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE";
+
+    public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps) {
+        kafkaHost = kafkaProps.get("kafkaHost");
+        kafkaPort = kafkaProps.get("kafkaPort");
+        kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
+        kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
+    }
+
+    @Override
+    public void configure() throws Exception {
+
+        //Prepare Kafka Deposit
+        StringBuilder kafkaUri = new StringBuilder("kafka:");
+        
kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic);
+        if(StringUtils.isNotBlank(kafkaImportGroupId)) {
+            kafkaUri.append("&groupId="+ kafkaImportGroupId);
+        }
+
+        KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
+        kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort);
+        kafkaConfiguration.setTopic(kafkaImportTopic);
+        kafkaConfiguration.setGroupId(kafkaImportGroupId);
+        KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new 
KafkaComponent(this.getContext()));
+        endpoint.setConfiguration(kafkaConfiguration);
+
+        LineSplitProcessor lineSplitProcessor = new LineSplitProcessor();
+
+
+        from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m")
+                .routeId(IMPORT_ONESHOT_ROUTE_ID)
+                .autoStartup(true)
+                .process(importConfigByFileNameProcessor)
+                
.split(bodyAs(String.class).tokenize("${in.header.importConfigOneShot.getLineSeparator}"))
+                .process(lineSplitProcessor)
+                .to("log:org.apache.unomi.router?level=INFO")
+                .marshal(jacksonDataFormat)
+                .convertBodyTo(String.class)
+                .to(endpoint);
+    }
+
+    public void 
setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor 
importConfigByFileNameProcessor) {
+        this.importConfigByFileNameProcessor = importConfigByFileNameProcessor;
+    }
+
+    public void setUploadDir(String uploadDir) {
+        this.uploadDir = uploadDir;
+    }
+
+    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
+        this.jacksonDataFormat = jacksonDataFormat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java
new file mode 100644
index 0000000..37ae59e
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.core.processor.LineSplitProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by amidani on 26/04/2017.
+ */
+
+public class ProfileImportSourceToKafkaRouteBuilder extends RouteBuilder {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ProfileImportSourceToKafkaRouteBuilder.class.getName());
+
+    private List<ImportConfiguration> importConfigurationList;
+    private JacksonDataFormat jacksonDataFormat;
+    private String kafkaHost;
+    private String kafkaPort;
+    private String kafkaImportTopic;
+    private String kafkaImportGroupId;
+
+    public ProfileImportSourceToKafkaRouteBuilder(Map<String, String> 
kafkaProps) {
+        kafkaHost = kafkaProps.get("kafkaHost");
+        kafkaPort = kafkaProps.get("kafkaPort");
+        kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
+        kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
+    }
+
+    @Override
+    public void configure() throws Exception {
+        //Prepare Kafka Deposit
+        StringBuilder kafkaUri = new StringBuilder("kafka:");
+        
kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic);
+        if(StringUtils.isNotBlank(kafkaImportGroupId)) {
+            kafkaUri.append("&groupId="+ kafkaImportGroupId);
+        }
+
+        KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
+        kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort);
+        kafkaConfiguration.setTopic(kafkaImportTopic);
+        kafkaConfiguration.setGroupId(kafkaImportGroupId);
+        KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new 
KafkaComponent(this.getContext()));
+        endpoint.setConfiguration(kafkaConfiguration);
+
+        //Loop on multiple import configuration
+        for(ImportConfiguration importConfiguration : importConfigurationList) 
{
+            if(importConfiguration.getProperties().size() > 0 &&
+                    StringUtils.isNotEmpty((String) 
importConfiguration.getProperties().get("source"))) {
+                //Prepare Split Processor
+                LineSplitProcessor lineSplitProcessor = new 
LineSplitProcessor();
+                lineSplitProcessor.setFieldsMapping((Map<String, Integer>) 
importConfiguration.getProperties().get("mapping"));
+                
lineSplitProcessor.setOverwriteExistingProfiles(importConfiguration.isOverwriteExistingProfiles());
+                
lineSplitProcessor.setPropertiesToOverwrite(importConfiguration.getPropertiesToOverwrite());
+                
lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty());
+                
lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator());
+
+                from((String) 
importConfiguration.getProperties().get("source"))
+                        .routeId(importConfiguration.getItemId())// This allow 
identification of the route for manual start/stop
+                        .autoStartup(importConfiguration.isActive())// 
Auto-start if the import configuration is set active
+                        
.split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator()))
+                        .process(lineSplitProcessor)
+                        .to("log:org.apache.unomi.router?level=INFO")
+                        .marshal(jacksonDataFormat)
+                        .convertBodyTo(String.class)
+                        .to(endpoint);
+            }
+        }
+    }
+
+    public void setImportConfigurationList(List<ImportConfiguration> 
importConfigurationList) {
+        this.importConfigurationList = importConfigurationList;
+    }
+
+    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
+        this.jacksonDataFormat = jacksonDataFormat;
+    }
+
+    public void setKafkaHost(String kafkaHost) {
+        this.kafkaHost = kafkaHost;
+    }
+
+    public void setKafkaPort(String kafkaPort) {
+        this.kafkaPort = kafkaPort;
+    }
+
+    public void setKafkaImportTopic(String kafkaImportTopic) {
+        this.kafkaImportTopic = kafkaImportTopic;
+    }
+
+    public void setKafkaImportGroupId(String kafkaImportGroupId) {
+        this.kafkaImportGroupId = kafkaImportGroupId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
new file mode 100644
index 0000000..4c36b9e
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<blueprint xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+           xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0";
+           
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0";
+           xmlns:camel="http://camel.apache.org/schema/blueprint";
+           xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 
http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
+                               
http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0 
http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.1.0.xsd
+                               http://camel.apache.org/schema/blueprint 
http://camel.apache.org/schema/blueprint/camel-blueprint.xsd";>
+
+    <cm:property-placeholder persistent-id="org.apache.unomi.router" 
update-strategy="reload">
+        <cm:default-properties>
+            <cm:property name="kafka.host" value="localhost"/>
+            <cm:property name="kafka.port" value="9092"/>
+            <cm:property name="kafka.import.topic" value="camel-deposit"/>
+            <cm:property name="kafka.import.groupId" 
value="unomi-import-group"/>
+            <cm:property name="import.oneshot.uploadDir" 
value="/tmp/oneshot_import_configs/"/>
+        </cm:default-properties>
+    </cm:property-placeholder>
+
+    <bean id="unomiStorageProcessor" 
class="org.apache.unomi.router.core.processor.UnomiStorageProcessor">
+        <property name="profileImportService" ref="profileImportService"/>
+    </bean>
+
+    <bean id="importConfigByFileNameProcessor" 
class="org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor">
+        <property name="importConfigurationService" 
ref="importConfigurationService"/>
+    </bean>
+
+    <bean id="jacksonDataFormat" 
class="org.apache.camel.component.jackson.JacksonDataFormat">
+        <property name="unmarshalType" 
value="org.apache.unomi.router.api.ProfileToImport"/>
+    </bean>
+
+    <bean id="jacksonDataFormatImportConfig" 
class="org.apache.camel.model.dataformat.JsonDataFormat">
+        <property name="unmarshalType" 
value="org.apache.unomi.router.api.ImportConfiguration"/>
+        <property name="library" value="Jackson"/>
+    </bean>
+
+    <bean class="org.apache.camel.component.servlet.osgi.OsgiServletRegisterer"
+          init-method="register"
+          destroy-method="unregister">
+        <property name="alias" value="/importConfigAdmin"/>
+        <property name="httpService" ref="httpService"/>
+        <property name="servlet" ref="camelServlet"/>
+    </bean>
+
+    <bean id="camelServlet" 
class="org.apache.camel.component.servlet.CamelHttpTransportServlet"/>
+
+
+    <bean id="camelContext" 
class="org.apache.unomi.router.core.context.ProfileImportCamelContext"
+          init-method="initCamelContext" destroy-method="preDestroy">
+        <property name="kafkaProps">
+            <map>
+                <entry key="kafkaHost" value="${kafka.host}"/>
+                <entry key="kafkaPort" value="${kafka.port}"/>
+                <entry key="kafkaImportTopic" value="${kafka.import.topic}"/>
+                <entry key="kafkaImportGroupId" 
value="${kafka.import.groupId}"/>
+            </map>
+        </property>
+        <property name="uploadDir" value="${import.oneshot.uploadDir}"/>
+        <property name="unomiStorageProcessor" ref="unomiStorageProcessor"/>
+        <property name="importConfigByFileNameProcessor" 
ref="importConfigByFileNameProcessor"/>
+        <property name="importConfigurationService" 
ref="importConfigurationService"/>
+        <property name="jacksonDataFormat" ref="jacksonDataFormat"/>
+        <property name="bundleContext" ref="blueprintBundleContext"/>
+    </bean>
+
+    <camel:camelContext id="httpEndpoint" 
xmlns="http://camel.apache.org/schema/blueprint";>
+        <camel:routeBuilder ref="profileImportConfigUpdateRouteBuilder" />
+    </camel:camelContext>
+
+    <bean id="profileImportConfigUpdateRouteBuilder" 
class="org.apache.unomi.router.core.route.ProfileImportConfigUpdateRouteBuilder">
+        <property name="profileImportCamelContext" ref="camelContext"/>
+    </bean>
+
+    <reference id="httpService" interface="org.osgi.service.http.HttpService"/>
+    <reference id="profileImportService" 
interface="org.apache.unomi.router.api.services.ProfileImportService"/>
+    <reference id="importConfigurationService" 
interface="org.apache.unomi.router.api.services.ImportConfigurationService"/>
+
+</blueprint>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg 
b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
new file mode 100644
index 0000000..ff2c8ef
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#Kafka
+ settingskafka.host=localhost
+kafka.port=9092
+kafka.import.topic=camel-deposit
+kafka.import.groupId=unomi-import-group
+
+#Import One Shot upload directory
+import.oneshot.uploadDir=/tmp/unomi_oneshot_import_configs/
\ No newline at end of file

Reply via email to