This is an automated email from the ASF dual-hosted git repository.
sergehuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new d6918fc48 UNOMI-888: Javadoc and API cleanup for import/export router
extension (#756)
d6918fc48 is described below
commit d6918fc488f2b8a4767629e779e5442029e4613b
Author: Serge Huber <[email protected]>
AuthorDate: Tue Jun 2 19:50:31 2026 +0200
UNOMI-888: Javadoc and API cleanup for import/export router extension (#756)
---
.../unomi/router/api/IRouterCamelContext.java | 66 ++++++++++-
.../router/api/ImportExportConfiguration.java | 130 ++++++++++++++-------
.../apache/unomi/router/api/ProfileToImport.java | 82 +++++++++++--
.../exceptions/BadProfileDataFormatException.java | 39 ++++++-
.../services/ImportExportConfigurationService.java | 44 ++++++-
.../router/api/services/ProfileExportService.java | 51 +++++++-
.../router/api/services/ProfileImportService.java | 47 +++++++-
.../unomi/router/core/bean/CollectProfileBean.java | 27 ++++-
.../router/core/context/RouterCamelContext.java | 48 +++++++-
.../processor/ExportRouteCompletionProcessor.java | 45 ++++++-
.../processor/ImportConfigByFileNameProcessor.java | 41 ++++++-
.../processor/ImportRouteCompletionProcessor.java | 53 ++++++++-
.../router/core/processor/LineBuildProcessor.java | 29 ++++-
.../core/processor/LineSplitFailureHandler.java | 37 +++++-
.../router/core/processor/LineSplitProcessor.java | 54 ++++++++-
.../core/processor/UnomiStorageProcessor.java | 42 ++++++-
.../route/ProfileExportCollectRouteBuilder.java | 51 +++++++-
.../route/ProfileExportProducerRouteBuilder.java | 50 +++++++-
.../route/ProfileImportFromSourceRouteBuilder.java | 53 ++++++++-
.../route/ProfileImportOneShotRouteBuilder.java | 53 ++++++++-
.../route/ProfileImportToUnomiRouteBuilder.java | 49 +++++++-
.../core/route/RouterAbstractRouteBuilder.java | 73 +++++++++++-
.../strategy/ArrayListAggregationStrategy.java | 33 +++++-
.../strategy/StringLinesAggregationStrategy.java | 32 ++++-
.../unomi/itests/PropertiesUpdateActionIT.java | 2 +-
25 files changed, 1154 insertions(+), 77 deletions(-)
diff --git
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
index 5ec1adb57..476b36e82 100644
---
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
+++
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
@@ -17,15 +17,79 @@
package org.apache.unomi.router.api;
/**
- * Created by amidani on 18/10/2017.
+ * Facade for the Apache Camel runtime used by the Unomi Router extension.
+ * Implementations manage dynamic routes for profile import (from sources such
as Kafka or files)
+ * and profile export (collection and producer pipelines), and expose a
minimal API so callers do not
+ * depend on Camel types unless they choose to.
+ *
+ * <p>Key responsibilities:
+ * <ul>
+ * <li>Removing obsolete Camel route definitions when configurations change
or are deleted</li>
+ * <li>Rebuilding import reader routes after an {@link
org.apache.unomi.router.api.ImportConfiguration} update</li>
+ * <li>Rebuilding export reader routes after an {@link
org.apache.unomi.router.api.ExportConfiguration} update</li>
+ * <li>Optional Camel tracing for troubleshooting route execution</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Typical usage:
+ * <ul>
+ * <li>Management services call update methods when import/export
configuration documents change</li>
+ * <li>Cleanup paths call {@link #killExistingRoute(String, boolean)} to
drop routes whose configs were removed</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public interface IRouterCamelContext {
+ /**
+ * Stops and removes an existing Camel route by id, if it is currently
registered in the context.
+ *
+ * @param routeId Camel route identifier (usually aligned with
import/export configuration id)
+ * @param fireEvent when {@code true}, signals that router lifecycle
events may be emitted; the concrete
+ * implementation defines whether events are fired
(reserved hook for observability)
+ * @throws Exception if Camel fails to remove the route definition
+ */
void killExistingRoute(String routeId, boolean fireEvent) throws Exception;
+ /**
+ * Refreshes the profile import reader route for the given configuration:
removes any existing route with the
+ * same id, loads the {@link
org.apache.unomi.router.api.ImportConfiguration}, and—for recurrent configs—
+ * registers a new route built from current settings.
+ *
+ * @param configId identifier of the import configuration whose reader
route should be updated
+ * @param fireEvent when {@code true}, signals that router lifecycle
events may be emitted after the update
+ * @throws Exception if route removal or registration fails
+ */
void updateProfileImportReaderRoute(String configId, boolean fireEvent)
throws Exception;
+ /**
+ * Refreshes the profile export reader (collect) route for the given
configuration: removes any existing route
+ * with the same id, loads the {@link
org.apache.unomi.router.api.ExportConfiguration}, and—for recurrent
+ * configs—registers a new collect route built from current settings.
+ *
+ * @param configId identifier of the export configuration whose reader
route should be updated
+ * @param fireEvent when {@code true}, signals that router lifecycle
events may be emitted after the update
+ * @throws Exception if route removal or registration fails
+ */
void updateProfileExportReaderRoute(String configId, boolean fireEvent)
throws Exception;
+ /**
+ * Enables or disables Camel route tracing on the underlying {@code
CamelContext} for debugging (message flow,
+ * exchanges). Intended for diagnostics in development or incident
analysis; may have performance impact when on.
+ *
+ * @param tracing {@code true} to enable Camel tracing, {@code false} to
disable
+ */
void setTracing(boolean tracing);
+
+ /**
+ * Returns the underlying Camel context instance.
+ * The API uses {@link Object} so consumers of this module are not
required to depend on Camel at compile time.
+ * Callers that ship Camel may cast to {@code
org.apache.camel.CamelContext}.
+ *
+ * @return the Camel context instance, or {@code null} if not initialized
or not exposed
+ */
+ default Object getCamelContext() {
+ return null;
+ }
}
diff --git
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java
index 10209bd15..7c9b5e238 100644
---
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java
+++
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java
@@ -16,15 +16,52 @@
*/
package org.apache.unomi.router.api;
- import org.apache.unomi.api.Item;
+import org.apache.unomi.api.Item;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
- * Created by amidani on 21/06/2017.
+ * Base configuration class for import and export operations in Apache Unomi.
+ * This class serves as the foundation for both ImportConfiguration and
ExportConfiguration,
+ * providing common configuration properties and behaviors needed for data
transfer operations.
+ *
+ * <p>Key features and responsibilities:
+ * <ul>
+ * <li>Defines common configuration properties for import/export
operations</li>
+ * <li>Manages separators and delimiters for CSV-like file formats</li>
+ * <li>Tracks execution status and history</li>
+ * <li>Handles configuration activation/deactivation</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Usage in Unomi:
+ * <ul>
+ * <li>Used by ImportExportConfigurationService to manage data transfer
configurations</li>
+ * <li>Consumed by Camel routes to determine how to process data</li>
+ * <li>Referenced by import/export processors to format data correctly</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Configuration properties include:
+ * <ul>
+ * <li>itemId - persisted identifier used by router services and routes
({@link Item#getItemId()})</li>
+ * <li>name - human-readable display name (not used as the route or
persistence key)</li>
+ * <li>configType - scheduling mode ({@link
RouterConstants#IMPORT_EXPORT_CONFIG_TYPE_RECURRENT} or
+ * {@link RouterConstants#IMPORT_EXPORT_CONFIG_TYPE_ONESHOT})</li>
+ * <li>columnSeparator - character used to separate columns (default:
",")</li>
+ * <li>lineSeparator - character used to separate lines (default: "\n")</li>
+ * <li>multiValueSeparator - character used to separate multiple values
(default: ";")</li>
+ * <li>active - whether the configuration is currently active</li>
+ * <li>status - current status of the configuration</li>
+ * <li>executions - history of execution attempts</li>
+ * </ul>
+ * </p>
+ *
+ * @see org.apache.unomi.router.api.services.ImportExportConfigurationService
+ * @since 1.0
*/
public class ImportExportConfiguration extends Item {
@@ -53,28 +90,32 @@ public class ImportExportConfiguration extends Item {
}
/**
- * Retrieves the name of the import configuration
- * @return the name of the import configuration
+ * Retrieves the display name of this configuration.
+ *
+ * @return the name of this configuration
*/
public String getName() { return this.name; }
/**
- * Sets the name of the import configuration
- * @param name the name of the import configuration
+ * Sets the display name of this configuration.
+ *
+ * @param name the name of this configuration
*/
public void setName(String name) {
this.name = name;
}
/**
- * Retrieves the description of the import configuration
- * @return the description of the import configuration
+ * Retrieves the human-readable description of this configuration.
+ *
+ * @return the description of this configuration
*/
public String getDescription() { return this.description; }
/**
- * Sets the description of the import configuration
- * @param description the description of the import configuration
+ * Sets the human-readable description of this configuration.
+ *
+ * @param description the description of this configuration
*/
public void setDescription(String description) {
this.description = description;
@@ -82,14 +123,18 @@ public class ImportExportConfiguration extends Item {
/**
- * Retrieves the config type of the import configuration
- * @return the config type of the import configuration
+ * Returns the scheduling mode for this configuration ({@code recurrent}
or {@code oneshot}).
+ *
+ * @return {@link RouterConstants#IMPORT_EXPORT_CONFIG_TYPE_RECURRENT} or
+ * {@link RouterConstants#IMPORT_EXPORT_CONFIG_TYPE_ONESHOT}
*/
public String getConfigType() { return this.configType; }
/**
- * Sets the config type of the import configuration
- * @param configType the config type of the import configuration
+ * Sets the scheduling mode for this configuration.
+ *
+ * @param configType {@link
RouterConstants#IMPORT_EXPORT_CONFIG_TYPE_RECURRENT} or
+ * {@link
RouterConstants#IMPORT_EXPORT_CONFIG_TYPE_ONESHOT}
*/
public void setConfigType(String configType) {
this.configType = configType;
@@ -106,45 +151,45 @@ public class ImportExportConfiguration extends Item {
}
/**
- * Retrieves a Map of all property name - value pairs for this import
configuration.
+ * Retrieves a map of all property name/value pairs for this configuration.
*
- * @return a Map of all property name - value pairs for this import
configuration
+ * @return a map of all property name/value pairs for this configuration
*/
public Map<String, Object> getProperties() {
return properties;
}
/**
- * Retrieves the import configuration active flag.
+ * Returns whether this configuration is active (eligible for scheduled or
triggered runs).
*
- * @return true if the import configuration is active false if not
+ * @return {@code true} if this configuration is active, {@code false}
otherwise
*/
public boolean isActive() {
return this.active;
}
/**
- * Sets the active flag true/false.
+ * Sets whether this configuration is active.
*
- * @param active a boolean to set to active or inactive the import
configuration
+ * @param active {@code true} to activate, {@code false} to deactivate
*/
public void setActive(boolean active) {
this.active = active;
}
/**
- * Retrieves the import configuration status for last execution.
+ * Retrieves the status of the last execution for this configuration.
*
- * @return status of the last execution
+ * @return status of the last execution, or {@code null} if none
*/
public String getStatus() {
return this.status;
}
/**
- * Sets status of the last execution.
+ * Sets the status of the last execution for this configuration.
*
- * @param status of the last execution
+ * @param status the status of the last execution
*/
public void setStatus(String status) {
this.status = status;
@@ -159,11 +204,16 @@ public class ImportExportConfiguration extends Item {
}
/**
- * Sets the column separator.
- * @param columnSeparator property used to specify a line separator.
Defaults to ','
+ * Sets the column separator used when reading or writing delimited text
(typically CSV).
+ *
+ * @param columnSeparator the column delimiter; must be exactly one
character when non-null
+ * @throws IllegalArgumentException if {@code columnSeparator} is empty or
longer than one character
*/
public void setColumnSeparator(String columnSeparator) {
- if(this.columnSeparator !=null) {
+ if (columnSeparator != null) {
+ if (columnSeparator.length() != 1) {
+ throw new IllegalArgumentException("columnSeparator must be
exactly one character");
+ }
this.columnSeparator = columnSeparator;
}
}
@@ -187,9 +237,9 @@ public class ImportExportConfiguration extends Item {
}
/**
- * Gets the multi value separator.
+ * Returns the separator used between multiple values within a single
field.
*
- * @return multiValueSeparator multi value separator
+ * @return the multi-value separator (often {@code ";"})
*/
public String getMultiValueSeparator() {
return this.multiValueSeparator;
@@ -206,9 +256,9 @@ public class ImportExportConfiguration extends Item {
}
/**
- * Gets the multi value delimiter.
+ * Returns the delimiter wrapping multi-valued fields when serialized.
*
- * @return multiValueDelimiter multi value delimiter
+ * @return the multi-value delimiter (may be empty when not used)
*/
public String getMultiValueDelimiter() {
return this.multiValueDelimiter;
@@ -225,8 +275,9 @@ public class ImportExportConfiguration extends Item {
}
/**
- * Retrieves the executions
- * @return executions
+ * Returns the history of execution records for this configuration
(timestamps, counts, errors, etc.).
+ *
+ * @return the list of execution maps; may be empty
*/
public List<Map<String, Object>> getExecutions() {
return this.executions;
@@ -234,8 +285,9 @@ public class ImportExportConfiguration extends Item {
/**
- * Sets the executions
- * @param executions executions
+ * Replaces the execution history for this configuration.
+ *
+ * @param executions the new execution history list
*/
public void setExecutions(List<Map<String, Object>> executions) {
this.executions = executions;
diff --git
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java
index 30e40e0c8..957ac88e9 100644
---
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java
+++
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java
@@ -22,56 +22,120 @@ import org.apache.unomi.api.Profile;
import java.util.List;
/**
- * An extension of {@link Profile} to handle merge strategy and deletion when
importing profiles
+ * A specialized Profile class designed for import operations in Apache Unomi.
+ * This class extends the standard {@link Profile} with additional properties
and behaviors
+ * specifically needed during the profile import process.
+ *
+ * <p>Key features:
+ * <ul>
+ * <li>Controls which properties should be overwritten during import</li>
+ * <li>Specifies the property used for merging with existing profiles</li>
+ * <li>Handles profile deletion flags</li>
+ * <li>Controls merge vs full-replace behavior for existing profiles (see
{@link #isOverwriteExistingProfiles()})</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Usage in Unomi:
+ * <ul>
+ * <li>Used by import processors to handle profile data</li>
+ * <li>Consumed by ProfileImportService for import operations</li>
+ * <li>Supports different import strategies (merge/overwrite/delete)</li>
+ * </ul>
+ * </p>
+ *
+ * @see Profile
+ * @see org.apache.unomi.router.api.services.ProfileImportService
+ * @since 1.0
*/
public class ProfileToImport extends Profile {
+ /** List of property names that should be overwritten during import */
private List<String> propertiesToOverwrite;
+
+ /** Property used to identify existing profiles for merging */
private String mergingProperty;
+
+ /** Flag indicating if this profile should be deleted */
private boolean profileToDelete;
- private boolean overwriteExistingProfiles;
+ /** Flag controlling whether to overwrite existing profile data */
+ private boolean overwriteExistingProfiles;
+ /**
+ * Gets the list of properties that should be overwritten during import.
+ * These properties will be updated even if they already exist in the
target profile.
+ *
+ * @return list of property names to overwrite
+ */
public List<String> getPropertiesToOverwrite() {
return this.propertiesToOverwrite;
}
+ /**
+ * Sets the list of properties that should be overwritten during import.
+ *
+ * @param propertiesToOverwrite list of property names that should be
overwritten
+ */
public void setPropertiesToOverwrite(List<String> propertiesToOverwrite) {
this.propertiesToOverwrite = propertiesToOverwrite;
}
+ /**
+ * Checks if this profile is marked for deletion.
+ * When true, the matching profile in the system will be deleted rather
than updated.
+ *
+ * @return true if the profile should be deleted, false otherwise
+ */
public boolean isProfileToDelete() {
return this.profileToDelete;
}
+ /**
+ * Sets whether this profile should be deleted during import.
+ *
+ * @param profileToDelete true to mark the profile for deletion, false
otherwise
+ */
public void setProfileToDelete(boolean profileToDelete) {
this.profileToDelete = profileToDelete;
}
+ /**
+ * Indicates whether selective property updates are enabled for existing
profiles.
+ * When {@code true} and {@link #getPropertiesToOverwrite()} is non-empty,
only the listed properties
+ * are updated on a matching profile. Otherwise the entire properties map
is replaced.
+ *
+ * @return {@code true} to apply selective overwrites when {@code
propertiesToOverwrite} is set
+ */
public boolean isOverwriteExistingProfiles() {
return this.overwriteExistingProfiles;
}
/**
- * Sets the overwriteExistingProfiles flag.
- * @param overwriteExistingProfiles flag used to specify if we want to
overwrite existing profiles
+ * Sets whether selective property updates are enabled for existing
profiles.
+ *
+ * @param overwriteExistingProfiles {@code true} to update only {@link
#getPropertiesToOverwrite()}
+ * when that list is non-empty; {@code
false} replaces the full properties map
*/
public void setOverwriteExistingProfiles(boolean
overwriteExistingProfiles) {
this.overwriteExistingProfiles = overwriteExistingProfiles;
}
+ /**
+ * Gets the property name used for identifying existing profiles during
merge operations.
+ * This property is used to match imported profiles with existing ones in
the system.
+ *
+ * @return the name of the property used for profile matching
+ */
public String getMergingProperty() {
return this.mergingProperty;
}
/**
- * Sets the merging property.
- * @param mergingProperty property used to check if the profile exist when
merging
+ * Sets the property name used for identifying existing profiles during
merge operations.
+ *
+ * @param mergingProperty the name of the property to use for profile
matching
*/
public void setMergingProperty(String mergingProperty) {
this.mergingProperty = mergingProperty;
}
-
-
-
}
diff --git
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java
index 85cf5ea80..c4156e929 100644
---
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java
+++
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java
@@ -17,18 +17,55 @@
package org.apache.unomi.router.api.exceptions;
/**
- * Created by amidani on 13/06/2017.
+ * Exception thrown when profile import line data cannot be parsed or
converted during import processing.
+ * Indicates issues with CSV structure, column mapping, or property value
conversion on an import line.
+ *
+ * <p>Common scenarios where this exception is thrown:
+ * <ul>
+ * <li>Invalid CSV format or column count mismatch on an import line</li>
+ * <li>Missing required profile fields in the mapping</li>
+ * <li>Property value conversion failures (e.g. unsupported type for a
mapped field)</li>
+ * <li>Malformed multi-value fields</li>
+ * <li>Empty lines in import files</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Usage in Unomi:
+ * <ul>
+ * <li>Thrown by import line processors (e.g. {@code
LineSplitProcessor})</li>
+ * <li>Handled by import route error handlers</li>
+ * </ul>
+ * </p>
+ *
+ * @see org.apache.unomi.router.api.ProfileToImport
+ * @since 1.0
*/
public class BadProfileDataFormatException extends Exception {
+ /**
+ * Constructs a new exception with {@code null} as its detail message.
+ * The cause is not initialized.
+ */
public BadProfileDataFormatException() {
super();
}
+ /**
+ * Constructs a new exception with the specified detail message.
+ * The cause is not initialized.
+ *
+ * @param message the detail message describing the cause of the exception
+ */
public BadProfileDataFormatException(String message) {
super(message);
}
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ *
+ * @param message the detail message describing the cause of the exception
+ * @param cause the underlying cause of the exception
+ */
public BadProfileDataFormatException(String message, Throwable cause) {
super(message, cause);
}
diff --git
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
index edb103cc5..023741708 100644
---
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
+++
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
@@ -24,7 +24,40 @@ import java.util.List;
import java.util.Map;
/**
- * A service to access and operate on {@link ImportConfiguration}s / {@link
ExportConfiguration}s.
+ * Service interface for managing import and export configurations in Apache
Unomi.
+ * This service provides CRUD operations for {@link ImportConfiguration}s and
{@link ExportConfiguration}s,
+ * as well as functionality to manage the lifecycle of data transfer
configurations.
+ *
+ * <p>Key responsibilities:
+ * <ul>
+ * <li>Managing the lifecycle of import/export configurations</li>
+ * <li>Providing CRUD operations for configurations</li>
+ * <li>Coordinating with Camel routes for configuration updates</li>
+ * <li>Tracking configuration changes that need route updates</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Usage in Unomi:
+ * <ul>
+ * <li>Used by REST endpoints to manage import/export configurations</li>
+ * <li>Consumed by Camel routes to get configuration updates</li>
+ * <li>Utilized by admin interfaces for configuration management</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Implementation considerations:
+ * <ul>
+ * <li>Implementations should handle configuration persistence</li>
+ * <li>Thread safety should be considered for concurrent operations</li>
+ * <li>Configuration changes should be properly propagated to running
routes</li>
+ * </ul>
+ * </p>
+ *
+ * @param <T> The type of configuration (ImportConfiguration or
ExportConfiguration)
+ * @see ImportConfiguration
+ * @see ExportConfiguration
+ * @see RouterConstants.CONFIG_CAMEL_REFRESH
+ * @since 1.0
*/
public interface ImportExportConfigurationService<T> {
@@ -38,7 +71,7 @@ public interface ImportExportConfigurationService<T> {
/**
* Retrieves the import/export configuration identified by the specified
identifier.
*
- * @param configId the identifier of the profile to retrieve
+ * @param configId the identifier of the configuration to retrieve
* @return the import/export configuration identified by the specified
identifier or
* {@code null} if no such import/export configuration exists
*/
@@ -61,8 +94,11 @@ public interface ImportExportConfigurationService<T> {
void delete(String configId);
/**
- * Used by camel route system to get the latest changes on configs and
reflect changes on camel routes if necessary
- * @return map of configId per operation to be done in camel
+ * Consumes pending configuration changes for the Camel router layer.
+ * Implementations typically dequeue IDs whose configurations were updated
or removed so that
+ * routes can be refreshed accordingly.
+ *
+ * @return a map from configuration ID to the refresh operation ({@link
RouterConstants.CONFIG_CAMEL_REFRESH})
*/
Map<String, RouterConstants.CONFIG_CAMEL_REFRESH>
consumeConfigsToBeRefresh();
}
diff --git
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java
index dc0d81df2..ccdc3711b 100644
---
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java
+++
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java
@@ -23,12 +23,61 @@ import org.apache.unomi.router.api.ExportConfiguration;
import java.util.Collection;
/**
- * Created by amidani on 30/06/2017.
+ * Service interface for handling the export of profiles from Apache Unomi.
+ * This service is responsible for extracting profiles based on segment
criteria
+ * and converting them into the appropriate export format (e.g., CSV).
+ *
+ * <p>Key responsibilities:
+ * <ul>
+ * <li>Extracting profiles based on segment criteria</li>
+ * <li>Converting profiles to export format</li>
+ * <li>Handling data formatting and transformation</li>
+ * <li>Managing export file generation</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Usage in Unomi:
+ * <ul>
+ * <li>Called by export route processors to handle profile extraction</li>
+ * <li>Used during scheduled export operations</li>
+ * <li>Integrated with Unomi's segmentation system</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Implementation considerations:
+ * <ul>
+ * <li>Must handle large data sets efficiently</li>
+ * <li>Should implement proper error handling</li>
+ * <li>Must respect profile property formatting</li>
+ * <li>Should handle multi-valued properties</li>
+ * </ul>
+ * </p>
+ *
+ * @see Profile
+ * @see ExportConfiguration
+ * @see PropertyType
+ * @since 1.0
*/
public interface ProfileExportService {
+ /**
+ * Extracts profiles belonging to a specified segment and formats them for
export.
+ * Implementations typically query profiles by segment, build CSV content
(including line separators
+ * between rows), append an execution record to the configuration, and
persist the updated configuration.
+ *
+ * @param exportConfiguration the configuration specifying export
parameters and format
+ * @return CSV (or configured delimited) content for the extracted profiles
+ */
String extractProfilesBySegment(ExportConfiguration exportConfiguration);
+ /**
+ * Converts a single profile to one delimited row according to the export
configuration mapping.
+ * Does not append line separators; callers or export routes add
separators between rows.
+ *
+ * @param profile the profile to convert
+ * @param exportConfiguration the configuration specifying the export
format
+ * @return one row of delimited profile data (no trailing line separator)
+ */
String convertProfileToCSVLine(Profile profile, ExportConfiguration
exportConfiguration);
}
diff --git
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java
index aa7d1829d..0d008bbee 100644
---
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java
+++
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java
@@ -21,9 +21,54 @@ import org.apache.unomi.router.api.ProfileToImport;
import java.lang.reflect.InvocationTargetException;
/**
- * Created by amidani on 20/05/2017.
+ * Service interface for handling the import of individual profiles into
Apache Unomi.
+ * This service is responsible for the actual processing and storage of
imported profile data,
+ * including merging with existing profiles or creating new ones as needed.
+ *
+ * <p>Key responsibilities:
+ * <ul>
+ * <li>Processing individual profile imports</li>
+ * <li>Merging imported data with existing profiles</li>
+ * <li>Handling profile creation for new imports</li>
+ * <li>Managing profile deletion when specified</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Usage in Unomi:
+ * <ul>
+ * <li>Called by import route processors to handle individual profile
data</li>
+ * <li>Used during batch import operations</li>
+ * <li>Integrated with Unomi's profile management system</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Implementation considerations:
+ * <ul>
+ * <li>Must handle profile merging strategies defined on {@link
ProfileToImport}</li>
+ * <li>Should implement proper error handling</li>
+ * <li>Must maintain data consistency</li>
+ * <li>Expects property values already parsed (type conversion is done
upstream, e.g. by import processors)</li>
+ * </ul>
+ * </p>
+ *
+ * @see ProfileToImport
+ * @see org.apache.unomi.api.Profile
+ * @since 1.0
*/
public interface ProfileImportService {
+ /**
+ * Processes a profile for import, handling the save, merge, or delete
operation as specified.
+ * This method is the core functionality for profile import processing,
determining whether to:
+ * - Create a new profile
+ * - Merge with an existing profile
+ * - Delete an existing profile
+ *
+ * @param profileToImport the profile data to be imported, containing all
necessary information
+ * for the import operation including the operation
type
+ * @return true if the operation was successful, false otherwise
+ * @throws InvocationTargetException if there is an error during property
mapping
+ * @throws IllegalAccessException if there is an error accessing profile
properties
+ */
boolean saveMergeDeleteImportedProfile(ProfileToImport profileToImport)
throws InvocationTargetException, IllegalAccessException;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
index 1ea03eb3a..ae31b6300 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
@@ -22,17 +22,42 @@ import org.apache.unomi.persistence.spi.PersistenceService;
import java.util.List;
/**
- * Created by amidani on 28/06/2017.
+ * A bean that handles the collection of profiles based on segment criteria.
+ * This class provides functionality to extract profiles from Unomi's
persistence
+ * layer based on segment membership.
+ *
+ * <p>Features:
+ * <ul>
+ * <li>Segment-based profile extraction via persistence queries</li>
+ * <li>Integration with Unomi's persistence service</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class CollectProfileBean {
private PersistenceService persistenceService;
+ /**
+ * Returns all profiles that belong to the given segment.
+ * <p>
+ * <strong>Note:</strong> the current implementation may load a large
result set into memory; see UNOMI-759.
+ * </p>
+ *
+ * @param segment the segment identifier to match (stored index {@code
"segments"})
+ * @return profiles for that segment; may be empty, never {@code null}
+ */
public List<Profile> extractProfileBySegment(String segment) {
// TODO: UNOMI-759 avoid loading all profiles in RAM here
return persistenceService.query("segments", segment,null,
Profile.class);
}
+ /**
+ * Sets the persistence service used for profile queries.
+ *
+ * @param persistenceService the Unomi persistence service to use
+ */
public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index 4d329209d..cfd167d04 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -49,7 +49,25 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
- * Created by amidani on 04/05/2017.
+ * The main Camel context manager for the Unomi Router component.
+ * This class manages the lifecycle of all import and export routes,
+ * handles route configuration updates, and maintains the Camel context.
+ *
+ * <p>Features:
+ * <ul>
+ * <li>Initializes and manages the Camel context</li>
+ * <li>Sets up import and export routes</li>
+ * <li>Handles route configuration updates</li>
+ * <li>Manages route lifecycle (start/stop/update)</li>
+ * <li>Supports Kafka ({@link RouterConstants#CONFIG_TYPE_KAFKA}) and
in-process
+ * {@code direct:} endpoints when configured as {@link
RouterConstants#CONFIG_TYPE_NOBROKER}</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Dependency-injection setters on this class are intended for
OSGi/Blueprint wiring and are not part of the
+ * {@link IRouterCamelContext} API surface.</p>
+ *
+ * @since 1.0
*/
public class RouterCamelContext implements IRouterCamelContext {
@@ -79,8 +97,11 @@ public class RouterCamelContext implements
IRouterCamelContext {
private Integer configsRefreshInterval = 1000;
private ScheduledFuture<?> scheduledFuture;
+ /** Reserved event topic identifier for future remove notifications (not
published by the current implementation). */
public static String EVENT_ID_REMOVE =
"org.apache.unomi.router.event.remove";
+ /** Event topic related to import lifecycle (reserved for integrations). */
public static String EVENT_ID_IMPORT =
"org.apache.unomi.router.event.import";
+ /** Event topic related to export lifecycle (reserved for integrations). */
public static String EVENT_ID_EXPORT =
"org.apache.unomi.router.event.export";
public void setExecHistorySize(String execHistorySize) {
@@ -99,10 +120,17 @@ public class RouterCamelContext implements
IRouterCamelContext {
this.configSharingService = configSharingService;
}
+ /** {@inheritDoc} */
+ @Override
public void setTracing(boolean tracing) {
- camelContext.setTracing(true);
+ camelContext.setTracing(tracing);
}
+ /**
+ * Initializes the scheduler, shared config properties, the Camel context,
and import/export routes.
+ *
+ * @throws Exception if Camel or service setup fails
+ */
public void init() throws Exception {
LOGGER.info("Initialize Camel Context...");
scheduler = Executors.newSingleThreadScheduledExecutor();
@@ -116,6 +144,11 @@ public class RouterCamelContext implements
IRouterCamelContext {
LOGGER.info("Camel Context initialized successfully.");
}
+ /**
+ * Stops the configuration refresh scheduler and shuts down the Camel
context (all routes and components).
+ *
+ * @throws Exception if Camel shutdown fails
+ */
public void destroy() throws Exception {
scheduledFuture.cancel(true);
if (scheduler != null) {
@@ -223,6 +256,8 @@ public class RouterCamelContext implements
IRouterCamelContext {
camelContext.start();
}
+ /** {@inheritDoc} */
+ @Override
public void killExistingRoute(String routeId, boolean fireEvent) throws
Exception {
//Active routes
Route route = camelContext.getRoute(routeId);
@@ -234,6 +269,8 @@ public class RouterCamelContext implements
IRouterCamelContext {
}
}
+ /** {@inheritDoc} */
+ @Override
public void updateProfileImportReaderRoute(String configId, boolean
fireEvent) throws Exception {
killExistingRoute(configId, false);
@@ -255,6 +292,8 @@ public class RouterCamelContext implements
IRouterCamelContext {
}
}
+ /** {@inheritDoc} */
+ @Override
public void updateProfileExportReaderRoute(String configId, boolean
fireEvent) throws Exception {
killExistingRoute(configId, false);
@@ -275,6 +314,11 @@ public class RouterCamelContext implements
IRouterCamelContext {
}
}
+ /**
+ * {@inheritDoc}
+ * <p>The concrete type is {@link org.apache.camel.CamelContext}; callers
may narrow the reference safely.</p>
+ */
+ @Override
public CamelContext getCamelContext() {
return camelContext;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
index d9794a8de..7f7a7e776 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
@@ -30,14 +30,47 @@ import java.util.HashMap;
import java.util.Map;
/**
- * Created by amidani on 29/06/2017.
+ * A Camel processor that handles the completion of profile export routes.
+ * This processor updates the export configuration with execution statistics
+ * and manages the execution history of export operations.
+ *
+ * <p>The processor performs the following operations:
+ * <ul>
+ * <li>Records export execution statistics</li>
+ * <li>Updates the export configuration status</li>
+ * <li>Maintains execution history within configured size limits</li>
+ * <li>Persists updated configuration information</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class ExportRouteCompletionProcessor implements Processor {
private static final Logger LOGGER =
LoggerFactory.getLogger(ExportRouteCompletionProcessor.class.getName());
+
+ /** Service for managing export configurations */
private ImportExportConfigurationService<ExportConfiguration>
exportConfigurationService;
+
+ /** Maximum number of execution history entries to maintain */
private int executionsHistorySize;
+ /**
+ * Processes the completion of an export route by updating its
configuration and statistics.
+ *
+ * <p>This method:
+ * <ul>
+ * <li>Loads the current export configuration</li>
+ * <li>Creates an execution entry with timestamp and statistics</li>
+ * <li>Updates the configuration with execution results</li>
+ * <li>Maintains the execution history size limit</li>
+ * <li>Updates the export status to complete</li>
+ * </ul>
+ * </p>
+ *
+ * @param exchange the Camel exchange containing export execution details
+ * @throws Exception if an error occurs during processing
+ */
@Override
public void process(Exchange exchange) throws Exception {
// We load the conf from ES because we are going to increment the
execution number
@@ -59,10 +92,20 @@ public class ExportRouteCompletionProcessor implements
Processor {
LOGGER.info("Processing route {} completed.",
exchange.getFromRouteId());
}
+ /**
+ * Sets the service used for managing export configurations.
+ *
+ * @param exportConfigurationService the service for handling export
configurations
+ */
public void
setExportConfigurationService(ImportExportConfigurationService<ExportConfiguration>
exportConfigurationService) {
this.exportConfigurationService = exportConfigurationService;
}
+ /**
+ * Sets the maximum size of the execution history to maintain.
+ *
+ * @param executionsHistorySize the maximum number of execution entries to
keep
+ */
public void setExecutionsHistorySize(int executionsHistorySize) {
this.executionsHistorySize = executionsHistorySize;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
index 61c6ed408..39e5a42d9 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
@@ -26,14 +26,48 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Created by amidani on 22/05/2017.
+ * A Camel processor that retrieves import configurations based on file names.
+ * This processor extracts the configuration ID from the filename and loads
+ * the corresponding import configuration for processing.
+ *
+ * <p>The processor expects filenames in the format:
+ * <pre>configurationId.extension</pre>
+ * where the configurationId matches an existing import configuration.</p>
+ *
+ * <p>Features:
+ * <ul>
+ * <li>Extracts configuration ID from filename</li>
+ * <li>Loads corresponding import configuration</li>
+ * <li>Sets configuration in exchange header for processing</li>
+ * <li>Handles missing configurations gracefully</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class ImportConfigByFileNameProcessor implements Processor {
private static final Logger LOGGER =
LoggerFactory.getLogger(ImportConfigByFileNameProcessor.class.getName());
+ /** Service for managing import configurations */
private ImportExportConfigurationService<ImportConfiguration>
importConfigurationService;
+ /**
+ * Processes the exchange by loading an import configuration based on the
filename.
+ *
+ * <p>This method:
+ * <ul>
+ * <li>Extracts the filename from the exchange body</li>
+ * <li>Parses the configuration ID from the filename</li>
+ * <li>Attempts to load the corresponding import configuration</li>
+ * <li>Sets the configuration in the exchange header if found</li>
+ * <li>Stops route processing if no configuration is found</li>
+ * </ul>
+ * </p>
+ *
+ * @param exchange the Camel exchange containing the file to process
+ * @throws Exception if an error occurs during processing
+ */
@Override
public void process(Exchange exchange) throws Exception {
@@ -49,6 +83,11 @@ public class ImportConfigByFileNameProcessor implements
Processor {
}
}
+ /**
+ * Sets the service used for managing import configurations.
+ *
+ * @param importConfigurationService the service for handling import
configurations
+ */
public void
setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration>
importConfigurationService) {
this.importConfigurationService = importConfigurationService;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
index 7554ab3fe..e34d965b6 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
@@ -26,15 +26,51 @@ import org.slf4j.LoggerFactory;
import java.util.*;
/**
- * Created by amidani on 14/06/2017.
+ * A Camel processor that handles the completion of profile import routes.
+ * This processor manages the final stage of import operations, collecting
statistics,
+ * handling errors, and updating the import configuration with execution
results.
+ *
+ * <p>The processor performs the following operations:
+ * <ul>
+ * <li>Collects import statistics persisted on the configuration (success
and failure counts, plus error details)</li>
+ * <li>Manages error reporting with configurable limits</li>
+ * <li>Updates import configuration status</li>
+ * <li>Maintains execution history</li>
+ * <li>Handles both one-shot and recurring imports</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class ImportRouteCompletionProcessor implements Processor {
private static final Logger LOGGER =
LoggerFactory.getLogger(ImportRouteCompletionProcessor.class.getName());
+
+ /** Service for managing import configurations */
private ImportExportConfigurationService<ImportConfiguration>
importConfigurationService;
+
+ /** Maximum number of execution history entries to maintain */
private int executionsHistorySize;
+
+ /** Maximum number of errors to report per execution */
private int execErrReportSize;
+ /**
+ * Processes the completion of an import route by collecting statistics
and updating configuration.
+ *
+ * <p>This method:
+ * <ul>
+ * <li>Identifies the import configuration (one-shot or recurring)</li>
+ * <li>Counts successful and failed imports (unrecognized line types are
skipped and not persisted)</li>
+ * <li>Collects error information up to the configured limit</li>
+ * <li>Updates the import configuration with execution results</li>
+ * <li>Sets the final status based on success/failure counts</li>
+ * </ul>
+ * </p>
+ *
+ * @param exchange the Camel exchange containing import results
+ * @throws Exception if an error occurs during processing
+ */
@Override
public void process(Exchange exchange) throws Exception {
String importConfigId = null;
@@ -89,14 +125,29 @@ public class ImportRouteCompletionProcessor implements
Processor {
LOGGER.info("Processing route {} completed. completion date: {}.",
exchange.getFromRouteId(), new Date());
}
+ /**
+ * Sets the service used for managing import configurations.
+ *
+ * @param importConfigurationService the service for handling import
configurations
+ */
public void
setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration>
importConfigurationService) {
this.importConfigurationService = importConfigurationService;
}
+ /**
+ * Sets the maximum size of the execution history to maintain.
+ *
+ * @param executionsHistorySize the maximum number of execution entries to
keep
+ */
public void setExecutionsHistorySize(int executionsHistorySize) {
this.executionsHistorySize = executionsHistorySize;
}
+ /**
+ * Sets the maximum number of errors to report per execution.
+ *
+ * @param execErrReportSize the maximum number of errors to store per
execution
+ */
public void setExecErrReportSize(int execErrReportSize) {
this.execErrReportSize = execErrReportSize;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
index ecfab189f..f1748cc0a 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
@@ -23,16 +23,43 @@ import org.apache.unomi.router.api.ExportConfiguration;
import org.apache.unomi.router.api.services.ProfileExportService;
/**
- * Created by amidani on 28/06/2017.
+ * A Camel processor that converts Unomi Profile objects into CSV lines for
export.
+ * This processor is responsible for transforming profile data into a
formatted string
+ * according to the export configuration specified in the exchange header.
+ *
+ * <p>The processor works in conjunction with the ProfileExportService to
perform
+ * the actual conversion of profile data to CSV format.</p>
+ *
+ * @since 1.0
*/
public class LineBuildProcessor implements Processor {
private ProfileExportService profileExportService;
+ /**
+ * Constructs a new LineBuildProcessor with the specified
ProfileExportService.
+ *
+ * @param profileExportService the service responsible for converting
profiles to CSV format
+ */
public LineBuildProcessor(ProfileExportService profileExportService) {
this.profileExportService = profileExportService;
}
+ /**
+ * Processes the exchange by converting a Profile object into a CSV line.
+ *
+ * <p>This method:
+ * <ul>
+ * <li>Extracts the export configuration from the exchange header</li>
+ * <li>Gets the Profile object from the exchange body</li>
+ * <li>Converts the profile to a CSV line using the
ProfileExportService</li>
+ * <li>Sets the resulting string as the new exchange body</li>
+ * </ul>
+ * </p>
+ *
+ * @param exchange the Camel exchange containing the Profile to convert
and export configuration
+ * @throws Exception if an error occurs during processing
+ */
@Override
public void process(Exchange exchange) throws Exception {
ExportConfiguration exportConfiguration = (ExportConfiguration)
exchange.getIn().getHeader("exportConfig");
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
index 3cf8ff6b6..2f4a5950d 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
@@ -25,12 +25,47 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Created by amidani on 14/06/2017.
+ * A Camel processor that handles failures during the line splitting process
of data import.
+ * This processor is responsible for creating structured error reports when
lines fail to process,
+ * providing detailed information about the nature of the failure and the
problematic data.
+ *
+ * <p>The handler processes different types of exceptions:
+ * <ul>
+ * <li>BadProfileDataFormatException - for data format related errors</li>
+ * <li>General exceptions - capturing the root cause message</li>
+ * </ul>
+ * </p>
+ *
+ * <p>For each failure, it creates an ImportLineError object containing:
+ * <ul>
+ * <li>The error code or message</li>
+ * <li>The content of the failed line</li>
+ * <li>The line number in the source file</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class LineSplitFailureHandler implements Processor {
private static final Logger LOGGER =
LoggerFactory.getLogger(LineSplitFailureHandler.class.getName());
+ /**
+ * Processes failures that occur during line splitting and creates
structured error reports.
+ *
+ * <p>This method:
+ * <ul>
+ * <li>Logs the failure details including the route ID and exception</li>
+ * <li>Creates an ImportLineError object with detailed error
information</li>
+ * <li>Extracts the appropriate error message based on the exception
type</li>
+ * <li>Sets the failure information in the exchange for further
processing</li>
+ * </ul>
+ * </p>
+ *
+ * @param exchange the Camel exchange containing the failed message and
exception details
+ * @throws Exception if an error occurs during failure handling
+ */
+ @Override
public void process(Exchange exchange) throws Exception {
LOGGER.error("Route: {}, Error: {}",
exchange.getProperty(Exchange.FAILURE_ROUTE_ID),
exchange.getProperty(Exchange.EXCEPTION_CAUGHT));
ImportLineError importLineError = new ImportLineError();
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
index 265fec8f5..e93d55637 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
@@ -35,25 +35,77 @@ import org.slf4j.LoggerFactory;
import java.util.*;
/**
- * Created by amidani on 29/12/2016.
+ * A Camel processor that splits and processes CSV lines into ProfileToImport
objects.
+ * This processor handles the conversion of CSV data into structured profile
data according
+ * to the import configuration, supporting various data types and multi-value
fields.
+ *
+ * <p>Features include:
+ * <ul>
+ * <li>CSV parsing using RFC4180 standard</li>
+ * <li>Support for header rows</li>
+ * <li>Field mapping to profile properties</li>
+ * <li>Multi-value field handling</li>
+ * <li>Type conversion based on property definitions</li>
+ * <li>Profile merging configuration</li>
+ * <li>Delete operation support</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class LineSplitProcessor implements Processor {
private static final Logger LOGGER =
LoggerFactory.getLogger(LineSplitProcessor.class.getName());
+ /** Maps field names to their corresponding column indices */
private Map<String, Integer> fieldsMapping;
+
+ /** List of properties that should be overwritten during import */
private List<String> propertiesToOverwrite;
+
+ /** Property used for merging profiles */
private String mergingProperty;
+
+ /** Whether to overwrite existing profiles during import */
private boolean overwriteExistingProfiles;
+
+ /** Whether the CSV file contains a header row */
private boolean hasHeader;
+
+ /** Whether the CSV file contains a column for delete operations */
private boolean hasDeleteColumn;
+
+ /** Character used to separate columns in the CSV */
private String columnSeparator;
+ /** Character used to separate multiple values within a field */
private String multiValueSeparator;
+
+ /** Characters used to delimit multi-value fields */
private String multiValueDelimiter;
+ /** Collection of property types used for type conversion */
private Collection<PropertyType> profilePropertyTypes;
+ /**
+ * Processes a single line from a CSV file and converts it into a
ProfileToImport object.
+ *
+ * <p>The method performs the following operations:
+ * <ul>
+ * <li>Handles one-shot import configurations if present</li>
+ * <li>Skips header row if configured</li>
+ * <li>Parses CSV line using RFC4180 standard</li>
+ * <li>Validates field mapping against data</li>
+ * <li>Converts fields according to their property types</li>
+ * <li>Handles multi-value fields</li>
+ * <li>Sets up profile merging configuration</li>
+ * <li>Processes delete operations if configured</li>
+ * </ul>
+ * </p>
+ *
+ * @param exchange the Camel exchange containing the CSV line to process
+ * @throws Exception if an error occurs during processing, including
BadProfileDataFormatException
+ */
@Override
public void process(Exchange exchange) throws Exception {
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java
index 94737b50f..3caadc878 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java
@@ -28,13 +28,43 @@ import java.util.Map;
import java.util.Set;
/**
- * Created by amidani on 29/12/2016.
+ * A Camel processor that handles the storage of imported profiles in the
Unomi system.
+ * This processor is responsible for managing the final stage of profile
import, including
+ * segment calculation and profile persistence.
+ *
+ * <p>The processor performs the following operations:
+ * <ul>
+ * <li>Processes profiles marked for import</li>
+ * <li>Calculates segments and scores for non-deleted profiles</li>
+ * <li>Updates profile information with calculated segments</li>
+ * <li>Persists profiles in the Unomi storage system</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class UnomiStorageProcessor implements Processor {
+ /** Service for handling profile import operations */
private ProfileImportService profileImportService;
+
+ /** Service for managing profile segments and scoring */
private SegmentService segmentService;
+ /**
+ * Processes the exchange by storing or updating the profile in Unomi's
storage system.
+ *
+ * <p>This method:
+ * <ul>
+ * <li>Extracts the ProfileToImport from the message body</li>
+ * <li>For non-delete operations, calculates and updates segments and
scores</li>
+ * <li>Persists the profile using the ProfileImportService</li>
+ * </ul>
+ * </p>
+ *
+ * @param exchange the Camel exchange containing the profile to process
+ * @throws Exception if an error occurs during processing
+ */
@Override
public void process(Exchange exchange)
throws Exception {
@@ -59,10 +89,20 @@ public class UnomiStorageProcessor implements Processor {
}
}
+ /**
+ * Sets the profile import service used for persisting profiles.
+ *
+ * @param profileImportService the service responsible for profile import
operations
+ */
public void setProfileImportService(ProfileImportService
profileImportService) {
this.profileImportService = profileImportService;
}
+ /**
+ * Sets the segment service used for calculating profile segments and
scores.
+ *
+ * @param segmentService the service responsible for segment calculations
+ */
public void setSegmentService(SegmentService segmentService) {
this.segmentService = segmentService;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
index 5529c109b..9a7a351b8 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
@@ -31,19 +31,58 @@ import java.util.List;
import java.util.Map;
/**
- * Created by amidani on 27/06/2017.
+ * A Camel route builder that handles the collection of profiles for export.
+ * This route builder creates routes that periodically collect profiles based
on
+ * segment criteria and prepare them for export processing.
+ *
+ * <p>Features:
+ * <ul>
+ * <li>Timer-based profile collection</li>
+ * <li>Segment-based profile filtering</li>
+ * <li>Support for multiple export configurations</li>
+ * <li>Configurable collection intervals</li>
+ * <li>Security through endpoint allowlist</li>
+ * <li>Support for Kafka and in-process {@code direct:} endpoints ({@link
RouterConstants#CONFIG_TYPE_KAFKA} / {@link
RouterConstants#CONFIG_TYPE_NOBROKER})</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class ProfileExportCollectRouteBuilder extends
RouterAbstractRouteBuilder {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProfileExportCollectRouteBuilder.class);
+ /** List of export configurations to process */
private List<ExportConfiguration> exportConfigurationList;
+
+ /** Service for persisting and retrieving data */
private PersistenceService persistenceService;
+ /**
+ * Constructs a new route builder with Kafka configuration.
+ *
+ * @param kafkaProps map containing Kafka configuration properties
+ * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link
RouterConstants#CONFIG_TYPE_NOBROKER}
+ */
public ProfileExportCollectRouteBuilder(Map<String, String> kafkaProps,
String configType) {
super(kafkaProps, configType);
}
+ /**
+ * Configures the routes for collecting profiles to export.
+ * Creates a route for each export configuration that matches the criteria.
+ *
+ * <p>Each route:
+ * <ul>
+ * <li>Runs on a configured timer schedule</li>
+ * <li>Collects profiles based on segment criteria</li>
+ * <li>Processes profiles for export</li>
+ * <li>Routes data to appropriate endpoints</li>
+ * </ul>
+ * </p>
+ *
+ * @throws Exception if an error occurs during route configuration
+ */
@Override
public void configure() throws Exception {
if (exportConfigurationList == null ||
exportConfigurationList.isEmpty()) {
@@ -93,10 +132,20 @@ public class ProfileExportCollectRouteBuilder extends
RouterAbstractRouteBuilder
}
}
+ /**
+ * Sets the list of export configurations to process.
+ *
+ * @param exportConfigurationList list of export configurations
+ */
public void setExportConfigurationList(List<ExportConfiguration>
exportConfigurationList) {
this.exportConfigurationList = exportConfigurationList;
}
+ /**
+ * Sets the persistence service for data operations.
+ *
+ * @param persistenceService service for persisting and retrieving data
+ */
public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
index e11378590..89619b7da 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
@@ -29,24 +29,67 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
/**
- * Created by amidani on 28/06/2017.
+ * A Camel route builder that handles the production of export data from
collected profiles.
+ * This route builder creates routes that process collected profiles and
formats them
+ * for export to the configured destination.
+ *
+ * <p>Features:
+ * <ul>
+ * <li>Profile data transformation to export format</li>
+ * <li>Line-by-line processing with aggregation</li>
+ * <li>Support for multiple export destinations</li>
+ * <li>Completion handling and status updates</li>
+ * <li>Support for Kafka and in-process {@code direct:} endpoints ({@link
RouterConstants#CONFIG_TYPE_KAFKA} / {@link
RouterConstants#CONFIG_TYPE_NOBROKER})</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class ProfileExportProducerRouteBuilder extends
RouterAbstractRouteBuilder {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProfileExportProducerRouteBuilder.class);
+ /** Processor for handling export completion */
private ExportRouteCompletionProcessor exportRouteCompletionProcessor;
+ /** Service for profile export operations */
private ProfileExportService profileExportService;
+ /**
+ * Constructs a new route builder with Kafka configuration.
+ *
+ * @param kafkaProps map containing Kafka configuration properties
+ * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link
RouterConstants#CONFIG_TYPE_NOBROKER}
+ */
public ProfileExportProducerRouteBuilder(Map<String, String> kafkaProps,
String configType) {
super(kafkaProps, configType);
}
+ /**
+ * Sets the profile export service.
+ *
+ * @param profileExportService service for handling profile exports
+ */
public void setProfileExportService(ProfileExportService
profileExportService) {
this.profileExportService = profileExportService;
}
+ /**
+ * Configures the routes for producing export data.
+ * Creates a route that processes collected profiles and prepares them for
export.
+ *
+ * <p>The route:
+ * <ul>
+ * <li>Unmarshals incoming profile data</li>
+ * <li>Processes profiles into export format</li>
+ * <li>Aggregates lines for batch processing</li>
+ * <li>Handles export completion</li>
+ * <li>Routes data to configured destinations</li>
+ * </ul>
+ * </p>
+ *
+ * @throws Exception if an error occurs during route configuration
+ */
@Override
public void configure() throws Exception {
@@ -69,6 +112,11 @@ public class ProfileExportProducerRouteBuilder extends
RouterAbstractRouteBuilde
}
+ /**
+ * Sets the processor for handling export completion.
+ *
+ * @param exportRouteCompletionProcessor processor for export completion
handling
+ */
public void
setExportRouteCompletionProcessor(ExportRouteCompletionProcessor
exportRouteCompletionProcessor) {
this.exportRouteCompletionProcessor = exportRouteCompletionProcessor;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
index 1ebc5c388..2b24fdbf8 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
@@ -37,20 +37,59 @@ import java.util.List;
import java.util.Map;
/**
- * Created by amidani on 26/04/2017.
+ * A Camel route builder that handles the import of profiles from configured
sources.
+ * This route builder creates routes that process incoming profile data from
various
+ * sources and prepares it for import into Unomi.
+ *
+ * <p>Features:
+ * <ul>
+ * <li>Support for multiple import configurations</li>
+ * <li>Line-by-line processing of import data</li>
+ * <li>Error handling and failure reporting</li>
+ * <li>Configuration validation and status updates</li>
+ * <li>Support for Kafka and in-process {@code direct:} endpoints ({@link
RouterConstants#CONFIG_TYPE_KAFKA} / {@link
RouterConstants#CONFIG_TYPE_NOBROKER})</li>
+ * <li>Graceful shutdown handling</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
-
public class ProfileImportFromSourceRouteBuilder extends
RouterAbstractRouteBuilder {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName());
+ /** List of import configurations to process */
private List<ImportConfiguration> importConfigurationList;
+
+ /** Service for managing import configurations */
private ImportExportConfigurationService<ImportConfiguration>
importConfigurationService;
+ /**
+ * Constructs a new route builder with Kafka configuration.
+ *
+ * @param kafkaProps map containing Kafka configuration properties
+ * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link
RouterConstants#CONFIG_TYPE_NOBROKER}
+ */
public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps,
String configType) {
super(kafkaProps, configType);
}
+ /**
+ * Configures the routes for importing profiles from sources.
+ * Creates routes for each import configuration and sets up error handling.
+ *
+ * <p>The routes:
+ * <ul>
+ * <li>Handle data validation and format errors</li>
+ * <li>Process data line by line</li>
+ * <li>Update import status and progress</li>
+ * <li>Route processed data to appropriate endpoints</li>
+ * <li>Manage graceful completion of imports</li>
+ * </ul>
+ * </p>
+ *
+ * @throws Exception if an error occurs during route configuration
+ */
@Override
public void configure() throws Exception {
@@ -132,10 +171,20 @@ public class ProfileImportFromSourceRouteBuilder extends
RouterAbstractRouteBuil
}
}
+ /**
+ * Sets the list of import configurations to process.
+ *
+ * @param importConfigurationList list of import configurations
+ */
public void setImportConfigurationList(List<ImportConfiguration>
importConfigurationList) {
this.importConfigurationList = importConfigurationList;
}
+ /**
+ * Sets the service for managing import configurations.
+ *
+ * @param importConfigurationService service for handling import
configurations
+ */
public void
setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration>
importConfigurationService) {
this.importConfigurationService = importConfigurationService;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
index 863437032..fd9d6c001 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
@@ -30,18 +30,59 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
/**
- * Created by amidani on 22/05/2017.
+ * A Camel route builder that handles one-time profile imports from files.
+ * This route builder creates routes that process CSV files dropped into a
+ * monitored directory for one-time import operations.
+ *
+ * <p>Features:
+ * <ul>
+ * <li>File-based import processing</li>
+ * <li>Configuration lookup from filename</li>
+ * <li>CSV file processing with error handling</li>
+ * <li>Support for Kafka and in-process {@code direct:} endpoints ({@link
RouterConstants#CONFIG_TYPE_KAFKA} / {@link
RouterConstants#CONFIG_TYPE_NOBROKER})</li>
+ * <li>Automatic file movement after processing</li>
+ * <li>Error reporting and failed file handling</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class ProfileImportOneShotRouteBuilder extends
RouterAbstractRouteBuilder {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName());
+
+ /** Processor for extracting import configuration from filenames */
private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
+
+ /** Directory to monitor for import files */
private String uploadDir;
+ /**
+ * Constructs a new route builder with Kafka configuration.
+ *
+ * @param kafkaProps map containing Kafka configuration properties
+ * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link
RouterConstants#CONFIG_TYPE_NOBROKER}
+ */
public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps,
String configType) {
super(kafkaProps, configType);
}
+ /**
+ * Configures the route for one-shot profile imports.
+ * Creates a route that monitors a directory for CSV files and processes
them for import.
+ *
+ * <p>The route:
+ * <ul>
+ * <li>Monitors upload directory for CSV files</li>
+ * <li>Extracts configuration from filename</li>
+ * <li>Processes file contents line by line</li>
+ * <li>Handles validation and format errors</li>
+ * <li>Routes processed data to appropriate endpoints</li>
+ * </ul>
+ * </p>
+ *
+ * @throws Exception if an error occurs during route configuration
+ */
@Override
public void configure() throws Exception {
@@ -81,10 +122,20 @@ public class ProfileImportOneShotRouteBuilder extends
RouterAbstractRouteBuilder
}
}
+ /**
+ * Sets the processor for handling import configuration by filename.
+ *
+ * @param importConfigByFileNameProcessor processor for filename-based
configuration
+ */
public void
setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor
importConfigByFileNameProcessor) {
this.importConfigByFileNameProcessor = importConfigByFileNameProcessor;
}
+ /**
+ * Sets the directory to monitor for import files.
+ *
+ * @param uploadDir path to the directory to monitor
+ */
public void setUploadDir(String uploadDir) {
this.uploadDir = uploadDir;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
index ff4942c75..7f31e57bb 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
@@ -29,19 +29,56 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
/**
- * Created by amidani on 26/04/2017.
+ * A Camel route builder that handles the final stage of profile imports by
storing
+ * processed profile data into Apache Unomi's storage system.
+ *
+ * <p>Features:
+ * <ul>
+ * <li>Final processing of imported profiles</li>
+ * <li>Integration with Unomi's storage system</li>
+ * <li>Support for Kafka and in-process {@code direct:} endpoints ({@link
RouterConstants#CONFIG_TYPE_KAFKA} / {@link
RouterConstants#CONFIG_TYPE_NOBROKER})</li>
+ * <li>Import completion handling</li>
+ * <li>Error handling and reporting</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public class ProfileImportToUnomiRouteBuilder extends
RouterAbstractRouteBuilder {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName());
+ /** Processor for storing profiles in Unomi */
private UnomiStorageProcessor unomiStorageProcessor;
+
+ /** Processor for handling import completion */
private ImportRouteCompletionProcessor importRouteCompletionProcessor;
+ /**
+ * Constructs a new route builder with Kafka configuration.
+ *
+ * @param kafkaProps map containing Kafka configuration properties
+ * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link
RouterConstants#CONFIG_TYPE_NOBROKER}
+ */
public ProfileImportToUnomiRouteBuilder(Map<String, String> kafkaProps,
String configType) {
super(kafkaProps, configType);
}
+ /**
+ * Configures the route for storing imported profiles in Unomi.
+ * Creates a route that processes incoming profile data and stores it in
Unomi's storage system.
+ *
+ * <p>The route:
+ * <ul>
+ * <li>Receives processed profile data</li>
+ * <li>Stores profiles in Unomi's storage system</li>
+ * <li>Handles import completion</li>
+ * <li>Manages error reporting</li>
+ * </ul>
+ * </p>
+ *
+ * @throws Exception if an error occurs during route configuration
+ */
@Override
public void configure() throws Exception {
@@ -67,10 +104,20 @@ public class ProfileImportToUnomiRouteBuilder extends
RouterAbstractRouteBuilder
.to("log:org.apache.unomi.router?level=DEBUG");
}
+ /**
+ * Sets the processor for storing profiles in Unomi.
+ *
+ * @param unomiStorageProcessor processor for Unomi storage operations
+ */
public void setUnomiStorageProcessor(UnomiStorageProcessor
unomiStorageProcessor) {
this.unomiStorageProcessor = unomiStorageProcessor;
}
+ /**
+ * Sets the processor for handling import completion.
+ *
+ * @param importRouteCompletionProcessor processor for import completion
operations
+ */
public void
setImportRouteCompletionProcessor(ImportRouteCompletionProcessor
importRouteCompletionProcessor) {
this.importRouteCompletionProcessor = importRouteCompletionProcessor;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
index ad06a00ec..69586990f 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
@@ -28,26 +28,67 @@ import org.apache.unomi.router.api.RouterConstants;
import java.util.Map;
/**
- * Created by amidani on 13/06/2017.
+ * Abstract base class for all Unomi router route builders.
+ * This class provides common functionality and configuration for both import
+ * and export routes, supporting Kafka ({@link
RouterConstants#CONFIG_TYPE_KAFKA}) and in-process
+ * {@code direct:} buffer endpoints when configured as {@link
RouterConstants#CONFIG_TYPE_NOBROKER}.
+ *
+ * <p>Features:
+ * <ul>
+ * <li>Common Kafka configuration handling</li>
+ * <li>Endpoint URI generation for Kafka topics or in-vm {@code direct:}
buffers</li>
+ * <li>Shared configuration for JSON data format</li>
+ * <li>Profile service integration</li>
+ * <li>Endpoint security through allowlist</li>
+ * </ul>
+ * </p>
+ *
+ * @since 1.0
*/
public abstract class RouterAbstractRouteBuilder extends RouteBuilder {
+ /** JSON data format configuration */
protected JacksonDataFormat jacksonDataFormat;
+ /** Kafka broker host */
protected String kafkaHost;
+
+ /** Kafka broker port */
protected String kafkaPort;
+
+ /** Topic for import operations */
protected String kafkaImportTopic;
+
+ /** Topic for export operations */
protected String kafkaExportTopic;
+
+ /** Consumer group ID for import operations */
protected String kafkaImportGroupId;
+
+ /** Consumer group ID for export operations */
protected String kafkaExportGroupId;
+
+ /** Number of Kafka consumers */
protected String kafkaConsumerCount;
+
+ /** Auto-commit configuration for Kafka */
protected String kafkaAutoCommit;
+ /** Router transport mode ({@link RouterConstants#CONFIG_TYPE_KAFKA} or
{@link RouterConstants#CONFIG_TYPE_NOBROKER}) */
protected String configType;
+
+ /** List of allowed endpoint schemes */
protected String allowedEndpoints;
+ /** Service for profile operations */
protected ProfileService profileService;
+ /**
+ * Constructs a new route builder with Kafka configuration.
+ *
+ * @param kafkaProps map containing Kafka configuration properties
+ * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link
RouterConstants#CONFIG_TYPE_NOBROKER}
+ */
public RouterAbstractRouteBuilder(Map<String, String> kafkaProps, String
configType) {
this.kafkaHost = kafkaProps.get("kafkaHost");
this.kafkaPort = kafkaProps.get("kafkaPort");
@@ -60,6 +101,21 @@ public abstract class RouterAbstractRouteBuilder extends
RouteBuilder {
this.configType = configType;
}
+ /**
+ * Gets the appropriate endpoint URI based on configuration type and
operation.
+ *
+ * <p>This method:
+ * <ul>
+ * <li>Creates Kafka endpoints with appropriate configuration when using
Kafka</li>
+ * <li>Returns direct endpoint URIs when not using Kafka</li>
+ * <li>Configures consumer properties for incoming endpoints</li>
+ * </ul>
+ * </p>
+ *
+ * @param direction the direction of the endpoint (to/from)
+ * @param operationDepositBuffer the operation buffer identifier
+ * @return Object either a KafkaEndpoint or String depending on
configuration
+ */
public Object getEndpointURI(String direction, String
operationDepositBuffer) {
Object endpoint;
if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
@@ -91,14 +147,29 @@ public abstract class RouterAbstractRouteBuilder extends
RouteBuilder {
return endpoint;
}
+ /**
+ * Sets the JSON data format configuration.
+ *
+ * @param jacksonDataFormat the JSON data format to use
+ */
public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
this.jacksonDataFormat = jacksonDataFormat;
}
+ /**
+ * Sets the list of allowed endpoint schemes.
+ *
+ * @param allowedEndpoints comma-separated list of allowed endpoint schemes
+ */
public void setAllowedEndpoints(String allowedEndpoints) {
this.allowedEndpoints = allowedEndpoints;
}
+ /**
+ * Sets the profile service.
+ *
+ * @param profileService the service for profile operations
+ */
public void setProfileService(ProfileService profileService) {
this.profileService = profileService;
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
index ca87ad3bd..c113e3aeb 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
@@ -22,11 +22,40 @@ import
org.apache.camel.processor.aggregate.AggregationStrategy;
import java.util.ArrayList;
/**
- * Created by amidani on 16/06/2017.
+ * An implementation of Camel's AggregationStrategy that aggregates exchange
bodies into an ArrayList.
+ * This strategy is useful when you need to collect multiple messages into a
single list for batch processing
+ * or grouped operations within the Unomi Router.
+ *
+ * <p>The strategy maintains the following behavior:
+ * <ul>
+ * <li>For the first message (when oldExchange is null), it creates a new
ArrayList and adds the message body to it</li>
+ * <li>For subsequent messages, it adds the new message body to the existing
ArrayList</li>
+ * </ul>
+ * </p>
+ *
+ * <p>The ArrayList is maintained in the exchange body, allowing for easy
access to all aggregated items
+ * once the aggregation is complete.</p>
+ *
+ * @since 1.0
*/
public class ArrayListAggregationStrategy implements AggregationStrategy {
-
+ /**
+ * Aggregates exchange messages by collecting their bodies into an
ArrayList.
+ *
+ * <p>This method implements the core aggregation logic where:
+ * <ul>
+ * <li>The new exchange's body is extracted as is (maintaining its
original type)</li>
+ * <li>If this is the first message, a new ArrayList is created to store
the messages</li>
+ * <li>The new body is added to the ArrayList</li>
+ * <li>The ArrayList is maintained in the exchange body for subsequent
aggregations</li>
+ * </ul>
+ * </p>
+ *
+ * @param oldExchange the previous exchange being aggregated (may be null
on first invocation)
+ * @param newExchange the current exchange being aggregated (contains the
new item to add to the list)
+ * @return the aggregated exchange containing the ArrayList of all
aggregated items
+ */
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
ArrayList<Object> list = null;
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
index d01859f22..8ccabe687 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
@@ -22,10 +22,40 @@ import org.apache.unomi.router.api.ExportConfiguration;
import org.apache.unomi.router.api.RouterUtils;
/**
- * Created by amidani on 29/06/2017.
+ * An implementation of Camel's AggregationStrategy that combines multiple
text lines into a single string
+ * for export purposes. This strategy is specifically designed to work with
the Unomi Router's export functionality,
+ * where multiple data lines need to be aggregated into a single export file.
+ *
+ * <p>The strategy maintains the following behavior:
+ * <ul>
+ * <li>For the first message (when oldExchange is null), it simply returns
the new exchange</li>
+ * <li>For subsequent messages, it appends the new content to the existing
content using the configured line separator</li>
+ * </ul>
+ * </p>
+ *
+ * <p>The line separator used for aggregation is obtained from the
ExportConfiguration object
+ * stored in the exchange header under the key "exportConfig".</p>
+ *
+ * @since 1.0
*/
public class StringLinesAggregationStrategy implements AggregationStrategy {
+ /**
+ * Aggregates two exchanges by combining their body content with
appropriate line separation.
+ *
+ * <p>This method implements the core aggregation logic where:
+ * <ul>
+ * <li>The new exchange's body is extracted as a String</li>
+ * <li>The line separator is obtained from the export configuration in
the exchange header</li>
+ * <li>If there's an old exchange, the new content is appended to it
with the line separator</li>
+ * <li>If there's no old exchange, the new exchange is returned as
is</li>
+ * </ul>
+ * </p>
+ *
+ * @param oldExchange the previous exchange being aggregated (may be null
on first invocation)
+ * @param newExchange the current exchange being aggregated (contains the
new line to append)
+ * @return the aggregated exchange containing the combined content
+ */
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody(String.class);
String lineSeparator = newExchange.getIn().getHeader("exportConfig",
ExportConfiguration.class).getLineSeparator();
diff --git
a/itests/src/test/java/org/apache/unomi/itests/PropertiesUpdateActionIT.java
b/itests/src/test/java/org/apache/unomi/itests/PropertiesUpdateActionIT.java
index 51368c503..70a732e15 100644
--- a/itests/src/test/java/org/apache/unomi/itests/PropertiesUpdateActionIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/PropertiesUpdateActionIT.java
@@ -78,7 +78,7 @@ public class PropertiesUpdateActionIT extends BaseIT {
}
@Test
- public void testUpdateProperties_CurrentProfile() {
+ public void testUpdateProperties_CurrentProfile() throws
InterruptedException {
Profile profile = profileService.load(PROFILE_TARGET_TEST_ID);
Assert.assertNull(profile.getProperty("firstName"));