This is an automated email from the ASF dual-hosted git repository. sergehuber pushed a commit to branch UNOMI-888-import-export-javadoc in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 51aa61c9fd49040c5093ded5bbdc1c9bfc4a747d Author: Serge Huber <[email protected]> AuthorDate: Wed May 6 08:33:53 2026 +0200 UNOMI-888: Javadoc and API cleanup for import/export router extension Add and expand Javadoc across router-api and router-core: IRouterCamelContext facade, ImportExportConfiguration and services, ProfileToImport, route builders, processors, aggregation strategies, and related exceptions. Also include small behavioral and API fixes discovered during documentation: - IRouterCamelContext: document methods and add getCamelContext() returning Object to avoid a compile-time Camel dependency for API consumers - RouterCamelContext: honor setTracing(boolean) (previously always enabled tracing); add @Override and {@inheritDoc} for interface methods - ImportExportConfiguration: fix setColumnSeparator to null-check the parameter, consistent with setLineSeparator - LineSplitFailureHandler: add @Override on process(Exchange) Replaces placeholder 'Created by' class comments with module-oriented descriptions. --- .../unomi/router/api/IRouterCamelContext.java | 65 ++++++++++- .../router/api/ImportExportConfiguration.java | 122 ++++++++++++++------- .../apache/unomi/router/api/ProfileToImport.java | 80 ++++++++++++-- .../exceptions/BadProfileDataFormatException.java | 41 ++++++- .../services/ImportExportConfigurationService.java | 44 +++++++- .../router/api/services/ProfileExportService.java | 57 +++++++++- .../router/api/services/ProfileImportService.java | 47 +++++++- .../unomi/router/core/bean/CollectProfileBean.java | 28 ++++- .../router/core/context/RouterCamelContext.java | 48 +++++++- .../processor/ExportRouteCompletionProcessor.java | 45 +++++++- .../processor/ImportConfigByFileNameProcessor.java | 41 ++++++- .../processor/ImportRouteCompletionProcessor.java | 53 ++++++++- .../router/core/processor/LineBuildProcessor.java | 29 ++++- .../core/processor/LineSplitFailureHandler.java | 37 ++++++- .../router/core/processor/LineSplitProcessor.java | 54 ++++++++- .../core/processor/UnomiStorageProcessor.java | 42 ++++++- .../route/ProfileExportCollectRouteBuilder.java | 51 ++++++++- .../route/ProfileExportProducerRouteBuilder.java | 50 ++++++++- .../route/ProfileImportFromSourceRouteBuilder.java | 53 ++++++++- .../route/ProfileImportOneShotRouteBuilder.java | 53 ++++++++- .../route/ProfileImportToUnomiRouteBuilder.java | 49 ++++++++- .../core/route/RouterAbstractRouteBuilder.java | 72 +++++++++++- .../strategy/ArrayListAggregationStrategy.java | 33 +++++- .../strategy/StringLinesAggregationStrategy.java | 32 +++++- 24 files changed, 1150 insertions(+), 76 deletions(-) diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java index 5ec1adb57..b525ddc47 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java @@ -17,15 +17,78 @@ package org.apache.unomi.router.api; /** - * Created by amidani on 18/10/2017. + * Facade for the Apache Camel runtime used by the Unomi Router extension. + * Implementations manage dynamic routes for profile import (from sources such as Kafka or files) + * and profile export (collection and producer pipelines), and expose a minimal API so callers do not + * depend on Camel types unless they choose to. + * + * <p>Key responsibilities: + * <ul> + * <li>Removing obsolete Camel route definitions when configurations change or are deleted</li> + * <li>Rebuilding import reader routes after an {@link org.apache.unomi.router.api.ImportConfiguration} update</li> + * <li>Rebuilding export reader routes after an {@link org.apache.unomi.router.api.ExportConfiguration} update</li> + * <li>Optional Camel tracing for troubleshooting route execution</li> + * </ul> + * </p> + * + * <p>Typical usage: + * <ul> + * <li>Management services call update methods when import/export configuration documents change</li> + * <li>Cleanup paths call {@link #killExistingRoute(String, boolean)} to drop routes whose configs were removed</li> + * </ul> + * </p> + * + * @see org.apache.unomi.router.core.context.RouterCamelContext + * @since 1.0 */ public interface IRouterCamelContext { + /** + * Stops and removes an existing Camel route by id, if it is currently registered in the context. + * + * @param routeId Camel route identifier (usually aligned with import/export configuration id) + * @param fireEvent when {@code true}, signals that router lifecycle events may be emitted; the concrete + * implementation defines whether events are fired (reserved hook for observability) + * @throws Exception if Camel fails to remove the route definition + */ void killExistingRoute(String routeId, boolean fireEvent) throws Exception; + /** + * Refreshes the profile import reader route for the given configuration: removes any existing route with the + * same id, loads the {@link org.apache.unomi.router.api.ImportConfiguration}, and—for recurrent configs— + * registers a new route built from current settings. + * + * @param configId identifier of the import configuration whose reader route should be updated + * @param fireEvent when {@code true}, signals that router lifecycle events may be emitted after the update + * @throws Exception if route removal or registration fails + */ void updateProfileImportReaderRoute(String configId, boolean fireEvent) throws Exception; + /** + * Refreshes the profile export reader (collect) route for the given configuration: removes any existing route + * with the same id, loads the {@link org.apache.unomi.router.api.ExportConfiguration}, and—for recurrent + * configs—registers a new collect route built from current settings. + * + * @param configId identifier of the export configuration whose reader route should be updated + * @param fireEvent when {@code true}, signals that router lifecycle events may be emitted after the update + * @throws Exception if route removal or registration fails + */ void updateProfileExportReaderRoute(String configId, boolean fireEvent) throws Exception; + /** + * Enables or disables Camel route tracing on the underlying {@code CamelContext} for debugging (message flow, + * exchanges). Intended for diagnostics in development or incident analysis; may have performance impact when on. + * + * @param tracing {@code true} to enable Camel tracing, {@code false} to disable + */ void setTracing(boolean tracing); + + /** + * Returns the underlying Camel context instance. + * The API uses {@link Object} so consumers of this module are not required to depend on Camel at compile time. + * Callers that ship Camel may cast to {@code org.apache.camel.CamelContext}. + * + * @return the Camel context instance, or {@code null} if not initialized + */ + Object getCamelContext(); } diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java index 10209bd15..1695b922d 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java @@ -16,15 +16,50 @@ */ package org.apache.unomi.router.api; - import org.apache.unomi.api.Item; +import org.apache.unomi.api.Item; - import java.util.ArrayList; - import java.util.HashMap; - import java.util.List; - import java.util.Map; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** - * Created by amidani on 21/06/2017. + * Base configuration class for import and export operations in Apache Unomi. + * This class serves as the foundation for both ImportConfiguration and ExportConfiguration, + * providing common configuration properties and behaviors needed for data transfer operations. + * + * <p>Key features and responsibilities: + * <ul> + * <li>Defines common configuration properties for import/export operations</li> + * <li>Manages separators and delimiters for CSV-like file formats</li> + * <li>Tracks execution status and history</li> + * <li>Handles configuration activation/deactivation</li> + * </ul> + * </p> + * + * <p>Usage in Unomi: + * <ul> + * <li>Used by ImportExportConfigurationService to manage data transfer configurations</li> + * <li>Consumed by Camel routes to determine how to process data</li> + * <li>Referenced by import/export processors to format data correctly</li> + * </ul> + * </p> + * + * <p>Configuration properties include: + * <ul> + * <li>name - unique identifier for the configuration</li> + * <li>configType - type of configuration (import/export)</li> + * <li>columnSeparator - character used to separate columns (default: ",")</li> + * <li>lineSeparator - character used to separate lines (default: "\n")</li> + * <li>multiValueSeparator - character used to separate multiple values (default: ";")</li> + * <li>active - whether the configuration is currently active</li> + * <li>status - current status of the configuration</li> + * <li>executions - history of execution attempts</li> + * </ul> + * </p> + * + * @see org.apache.unomi.router.api.services.ImportExportConfigurationService + * @since 1.0 */ public class ImportExportConfiguration extends Item { @@ -53,28 +88,32 @@ public class ImportExportConfiguration extends Item { } /** - * Retrieves the name of the import configuration - * @return the name of the import configuration + * Retrieves the display name of this configuration. + * + * @return the name of this configuration */ public String getName() { return this.name; } /** - * Sets the name of the import configuration - * @param name the name of the import configuration + * Sets the display name of this configuration. + * + * @param name the name of this configuration */ public void setName(String name) { this.name = name; } /** - * Retrieves the description of the import configuration - * @return the description of the import configuration + * Retrieves the human-readable description of this configuration. + * + * @return the description of this configuration */ public String getDescription() { return this.description; } /** - * Sets the description of the import configuration - * @param description the description of the import configuration + * Sets the human-readable description of this configuration. + * + * @param description the description of this configuration */ public void setDescription(String description) { this.description = description; @@ -82,14 +121,16 @@ public class ImportExportConfiguration extends Item { /** - * Retrieves the config type of the import configuration - * @return the config type of the import configuration + * Retrieves the configuration type (for example import vs export semantics used by the router). + * + * @return the config type of this configuration */ public String getConfigType() { return this.configType; } /** - * Sets the config type of the import configuration - * @param configType the config type of the import configuration + * Sets the configuration type. + * + * @param configType the config type for this configuration */ public void setConfigType(String configType) { this.configType = configType; @@ -106,45 +147,45 @@ public class ImportExportConfiguration extends Item { } /** - * Retrieves a Map of all property name - value pairs for this import configuration. + * Retrieves a map of all property name/value pairs for this configuration. * - * @return a Map of all property name - value pairs for this import configuration + * @return a map of all property name/value pairs for this configuration */ public Map<String, Object> getProperties() { return properties; } /** - * Retrieves the import configuration active flag. + * Returns whether this configuration is active (eligible for scheduled or triggered runs). * - * @return true if the import configuration is active false if not + * @return {@code true} if this configuration is active, {@code false} otherwise */ public boolean isActive() { return this.active; } /** - * Sets the active flag true/false. + * Sets whether this configuration is active. * - * @param active a boolean to set to active or inactive the import configuration + * @param active {@code true} to activate, {@code false} to deactivate */ public void setActive(boolean active) { this.active = active; } /** - * Retrieves the import configuration status for last execution. + * Retrieves the status of the last execution for this configuration. * - * @return status of the last execution + * @return status of the last execution, or {@code null} if none */ public String getStatus() { return this.status; } /** - * Sets status of the last execution. + * Sets the status of the last execution for this configuration. * - * @param status of the last execution + * @param status the status of the last execution */ public void setStatus(String status) { this.status = status; @@ -159,11 +200,12 @@ public class ImportExportConfiguration extends Item { } /** - * Sets the column separator. - * @param columnSeparator property used to specify a line separator. Defaults to ',' + * Sets the column separator used when reading or writing delimited text (typically CSV). + * + * @param columnSeparator the column delimiter; defaults to {@code ","} when not overridden */ public void setColumnSeparator(String columnSeparator) { - if(this.columnSeparator !=null) { + if (columnSeparator != null) { this.columnSeparator = columnSeparator; } } @@ -187,9 +229,9 @@ public class ImportExportConfiguration extends Item { } /** - * Gets the multi value separator. + * Returns the separator used between multiple values within a single field. * - * @return multiValueSeparator multi value separator + * @return the multi-value separator (often {@code ";"}) */ public String getMultiValueSeparator() { return this.multiValueSeparator; @@ -206,9 +248,9 @@ public class ImportExportConfiguration extends Item { } /** - * Gets the multi value delimiter. + * Returns the delimiter wrapping multi-valued fields when serialized. * - * @return multiValueDelimiter multi value delimiter + * @return the multi-value delimiter (may be empty when not used) */ public String getMultiValueDelimiter() { return this.multiValueDelimiter; @@ -225,8 +267,9 @@ public class ImportExportConfiguration extends Item { } /** - * Retrieves the executions - * @return executions + * Returns the history of execution records for this configuration (timestamps, counts, errors, etc.). + * + * @return the list of execution maps; may be empty */ public List<Map<String, Object>> getExecutions() { return this.executions; @@ -234,8 +277,9 @@ public class ImportExportConfiguration extends Item { /** - * Sets the executions - * @param executions executions + * Replaces the execution history for this configuration. + * + * @param executions the new execution history list */ public void setExecutions(List<Map<String, Object>> executions) { this.executions = executions; diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java index 30e40e0c8..bb87cd375 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java @@ -22,56 +22,118 @@ import org.apache.unomi.api.Profile; import java.util.List; /** - * An extension of {@link Profile} to handle merge strategy and deletion when importing profiles + * A specialized Profile class designed for import operations in Apache Unomi. + * This class extends the standard {@link Profile} with additional properties and behaviors + * specifically needed during the profile import process. + * + * <p>Key features: + * <ul> + * <li>Controls which properties should be overwritten during import</li> + * <li>Specifies the property used for merging with existing profiles</li> + * <li>Handles profile deletion flags</li> + * <li>Controls overwrite behavior for existing profiles</li> + * </ul> + * </p> + * + * <p>Usage in Unomi: + * <ul> + * <li>Used by import processors to handle profile data</li> + * <li>Consumed by ProfileImportService for import operations</li> + * <li>Supports different import strategies (merge/overwrite/delete)</li> + * </ul> + * </p> + * + * @see Profile + * @see org.apache.unomi.router.api.services.ProfileImportService + * @since 1.0 */ public class ProfileToImport extends Profile { + /** List of property names that should be overwritten during import */ private List<String> propertiesToOverwrite; + + /** Property used to identify existing profiles for merging */ private String mergingProperty; + + /** Flag indicating if this profile should be deleted */ private boolean profileToDelete; - private boolean overwriteExistingProfiles; + /** Flag controlling whether to overwrite existing profile data */ + private boolean overwriteExistingProfiles; + /** + * Gets the list of properties that should be overwritten during import. + * These properties will be updated even if they already exist in the target profile. + * + * @return list of property names to overwrite + */ public List<String> getPropertiesToOverwrite() { return this.propertiesToOverwrite; } + /** + * Sets the list of properties that should be overwritten during import. + * + * @param propertiesToOverwrite list of property names that should be overwritten + */ public void setPropertiesToOverwrite(List<String> propertiesToOverwrite) { this.propertiesToOverwrite = propertiesToOverwrite; } + /** + * Checks if this profile is marked for deletion. + * When true, the matching profile in the system will be deleted rather than updated. + * + * @return true if the profile should be deleted, false otherwise + */ public boolean isProfileToDelete() { return this.profileToDelete; } + /** + * Sets whether this profile should be deleted during import. + * + * @param profileToDelete true to mark the profile for deletion, false otherwise + */ public void setProfileToDelete(boolean profileToDelete) { this.profileToDelete = profileToDelete; } + /** + * Checks if existing profiles should be overwritten during import. + * When true, all properties of existing profiles will be overwritten with imported data. + * + * @return true if existing profiles should be overwritten, false for selective updates + */ public boolean isOverwriteExistingProfiles() { return this.overwriteExistingProfiles; } /** - * Sets the overwriteExistingProfiles flag. - * @param overwriteExistingProfiles flag used to specify if we want to overwrite existing profiles + * Sets whether existing profiles should be completely overwritten during import. + * + * @param overwriteExistingProfiles true to overwrite all properties, false for selective updates */ public void setOverwriteExistingProfiles(boolean overwriteExistingProfiles) { this.overwriteExistingProfiles = overwriteExistingProfiles; } + /** + * Gets the property name used for identifying existing profiles during merge operations. + * This property is used to match imported profiles with existing ones in the system. + * + * @return the name of the property used for profile matching + */ public String getMergingProperty() { return this.mergingProperty; } /** - * Sets the merging property. - * @param mergingProperty property used to check if the profile exist when merging + * Sets the property name used for identifying existing profiles during merge operations. + * + * @param mergingProperty the name of the property to use for profile matching */ public void setMergingProperty(String mergingProperty) { this.mergingProperty = mergingProperty; } - - - } diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java index 85cf5ea80..731e377e5 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java @@ -17,18 +17,57 @@ package org.apache.unomi.router.api.exceptions; /** - * Created by amidani on 13/06/2017. + * Exception thrown when profile data cannot be properly parsed or formatted during import/export operations. + * This exception indicates issues with the structure or content of profile data that prevent it from being + * properly processed by the Unomi router. + * + * <p>Common scenarios where this exception is thrown: + * <ul> + * <li>Invalid CSV format in import files</li> + * <li>Missing required profile fields</li> + * <li>Incorrect data types for profile properties</li> + * <li>Malformed multi-value fields</li> + * <li>Invalid date formats</li> + * </ul> + * </p> + * + * <p>Usage in Unomi: + * <ul> + * <li>Thrown by profile import processors</li> + * <li>Used in data validation steps</li> + * <li>Caught by error handling routes</li> + * </ul> + * </p> + * + * @see org.apache.unomi.router.api.ProfileToImport + * @since 1.0 */ public class BadProfileDataFormatException extends Exception { + /** + * Constructs a new exception with {@code null} as its detail message. + * The cause is not initialized. + */ public BadProfileDataFormatException() { super(); } + /** + * Constructs a new exception with the specified detail message. + * The cause is not initialized. + * + * @param message the detail message describing the cause of the exception + */ public BadProfileDataFormatException(String message) { super(message); } + /** + * Constructs a new exception with the specified detail message and cause. + * + * @param message the detail message describing the cause of the exception + * @param cause the underlying cause of the exception + */ public BadProfileDataFormatException(String message, Throwable cause) { super(message, cause); } diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java index edb103cc5..023741708 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java @@ -24,7 +24,40 @@ import java.util.List; import java.util.Map; /** - * A service to access and operate on {@link ImportConfiguration}s / {@link ExportConfiguration}s. + * Service interface for managing import and export configurations in Apache Unomi. + * This service provides CRUD operations for {@link ImportConfiguration}s and {@link ExportConfiguration}s, + * as well as functionality to manage the lifecycle of data transfer configurations. + * + * <p>Key responsibilities: + * <ul> + * <li>Managing the lifecycle of import/export configurations</li> + * <li>Providing CRUD operations for configurations</li> + * <li>Coordinating with Camel routes for configuration updates</li> + * <li>Tracking configuration changes that need route updates</li> + * </ul> + * </p> + * + * <p>Usage in Unomi: + * <ul> + * <li>Used by REST endpoints to manage import/export configurations</li> + * <li>Consumed by Camel routes to get configuration updates</li> + * <li>Utilized by admin interfaces for configuration management</li> + * </ul> + * </p> + * + * <p>Implementation considerations: + * <ul> + * <li>Implementations should handle configuration persistence</li> + * <li>Thread safety should be considered for concurrent operations</li> + * <li>Configuration changes should be properly propagated to running routes</li> + * </ul> + * </p> + * + * @param <T> The type of configuration (ImportConfiguration or ExportConfiguration) + * @see ImportConfiguration + * @see ExportConfiguration + * @see RouterConstants.CONFIG_CAMEL_REFRESH + * @since 1.0 */ public interface ImportExportConfigurationService<T> { @@ -38,7 +71,7 @@ public interface ImportExportConfigurationService<T> { /** * Retrieves the import/export configuration identified by the specified identifier. * - * @param configId the identifier of the profile to retrieve + * @param configId the identifier of the configuration to retrieve * @return the import/export configuration identified by the specified identifier or * {@code null} if no such import/export configuration exists */ @@ -61,8 +94,11 @@ public interface ImportExportConfigurationService<T> { void delete(String configId); /** - * Used by camel route system to get the latest changes on configs and reflect changes on camel routes if necessary - * @return map of configId per operation to be done in camel + * Consumes pending configuration changes for the Camel router layer. + * Implementations typically dequeue IDs whose configurations were updated or removed so that + * routes can be refreshed accordingly. + * + * @return a map from configuration ID to the refresh operation ({@link RouterConstants.CONFIG_CAMEL_REFRESH}) */ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> consumeConfigsToBeRefresh(); } diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java index dc0d81df2..37b921c78 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java @@ -23,12 +23,67 @@ import org.apache.unomi.router.api.ExportConfiguration; import java.util.Collection; /** - * Created by amidani on 30/06/2017. + * Service interface for handling the export of profiles from Apache Unomi. + * This service is responsible for extracting profiles based on segment criteria + * and converting them into the appropriate export format (e.g., CSV). + * + * <p>Key responsibilities: + * <ul> + * <li>Extracting profiles based on segment criteria</li> + * <li>Converting profiles to export format</li> + * <li>Handling data formatting and transformation</li> + * <li>Managing export file generation</li> + * </ul> + * </p> + * + * <p>Usage in Unomi: + * <ul> + * <li>Called by export route processors to handle profile extraction</li> + * <li>Used during scheduled export operations</li> + * <li>Integrated with Unomi's segmentation system</li> + * </ul> + * </p> + * + * <p>Implementation considerations: + * <ul> + * <li>Must handle large data sets efficiently</li> + * <li>Should implement proper error handling</li> + * <li>Must respect profile property formatting</li> + * <li>Should handle multi-valued properties</li> + * </ul> + * </p> + * + * @see Profile + * @see ExportConfiguration + * @see PropertyType + * @since 1.0 */ public interface ProfileExportService { + /** + * Extracts profiles belonging to a specified segment and formats them for export. + * This method handles the bulk export operation, including: + * - Querying profiles based on segment criteria + * - Formatting profiles according to export configuration + * - Generating the export content + * + * @param exportConfiguration the configuration specifying export parameters and format + * @return a String containing the formatted export data + */ String extractProfilesBySegment(ExportConfiguration exportConfiguration); + /** + * Converts a single profile to a CSV line format according to the export configuration. + * This method handles the formatting of individual profiles, including: + * - Property selection and ordering + * - Value formatting + * - Multi-value handling + * - Line separator management + * + * @param profile the profile to convert + * @param exportConfiguration the configuration specifying the export format + * @return a String containing the CSV-formatted profile data + */ String convertProfileToCSVLine(Profile profile, ExportConfiguration exportConfiguration); } diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java index aa7d1829d..fc1fe931c 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java @@ -21,9 +21,54 @@ import org.apache.unomi.router.api.ProfileToImport; import java.lang.reflect.InvocationTargetException; /** - * Created by amidani on 20/05/2017. + * Service interface for handling the import of individual profiles into Apache Unomi. + * This service is responsible for the actual processing and storage of imported profile data, + * including merging with existing profiles or creating new ones as needed. + * + * <p>Key responsibilities: + * <ul> + * <li>Processing individual profile imports</li> + * <li>Merging imported data with existing profiles</li> + * <li>Handling profile creation for new imports</li> + * <li>Managing profile deletion when specified</li> + * </ul> + * </p> + * + * <p>Usage in Unomi: + * <ul> + * <li>Called by import route processors to handle individual profile data</li> + * <li>Used during batch import operations</li> + * <li>Integrated with Unomi's profile management system</li> + * </ul> + * </p> + * + * <p>Implementation considerations: + * <ul> + * <li>Must handle profile merging strategies</li> + * <li>Should implement proper error handling</li> + * <li>Must maintain data consistency</li> + * <li>Should handle property type conversions</li> + * </ul> + * </p> + * + * @see ProfileToImport + * @see org.apache.unomi.api.Profile + * @since 1.0 */ public interface ProfileImportService { + /** + * Processes a profile for import, handling the save, merge, or delete operation as specified. + * This method is the core functionality for profile import processing, determining whether to: + * - Create a new profile + * - Merge with an existing profile + * - Delete an existing profile + * + * @param profileToImport the profile data to be imported, containing all necessary information + * for the import operation including the operation type + * @return true if the operation was successful, false otherwise + * @throws InvocationTargetException if there is an error during property mapping + * @throws IllegalAccessException if there is an error accessing profile properties + */ boolean saveMergeDeleteImportedProfile(ProfileToImport profileToImport) throws InvocationTargetException, IllegalAccessException; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java index 1ea03eb3a..43eb8f61d 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java @@ -22,17 +22,43 @@ import org.apache.unomi.persistence.spi.PersistenceService; import java.util.List; /** - * Created by amidani on 28/06/2017. + * A bean that handles the collection of profiles based on segment criteria. + * This class provides functionality to extract profiles from Unomi's persistence + * layer based on segment membership. + * + * <p>Features: + * <ul> + * <li>Segment-based profile extraction</li> + * <li>Integration with Unomi's persistence service</li> + * <li>Batch profile retrieval capabilities</li> + * </ul> + * </p> + * + * @since 1.0 */ public class CollectProfileBean { private PersistenceService persistenceService; + /** + * Returns all profiles that belong to the given segment. + * <p> + * <strong>Note:</strong> the current implementation may load a large result set into memory; see UNOMI-759. + * </p> + * + * @param segment the segment identifier to match (stored index {@code "segments"}) + * @return profiles for that segment; may be empty, never {@code null} + */ public List<Profile> extractProfileBySegment(String segment) { // TODO: UNOMI-759 avoid loading all profiles in RAM here return persistenceService.query("segments", segment,null, Profile.class); } + /** + * Sets the persistence service used for profile queries. + * + * @param persistenceService the Unomi persistence service to use + */ public void setPersistenceService(PersistenceService persistenceService) { this.persistenceService = persistenceService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java index 4d329209d..ae23e5ebc 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java @@ -49,7 +49,25 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** - * Created by amidani on 04/05/2017. + * The main Camel context manager for the Unomi Router component. + * This class manages the lifecycle of all import and export routes, + * handles route configuration updates, and maintains the Camel context. + * + * <p>Features: + * <ul> + * <li>Initializes and manages the Camel context</li> + * <li>Sets up import and export routes</li> + * <li>Handles route configuration updates</li> + * <li>Manages route lifecycle (start/stop/update)</li> + * <li>Provides monitoring through event notifications</li> + * <li>Supports both Kafka and direct endpoints</li> + * </ul> + * </p> + * + * <p>Dependency-injection setters on this class are intended for OSGi/Blueprint wiring and are not part of the + * {@link IRouterCamelContext} API surface.</p> + * + * @since 1.0 */ public class RouterCamelContext implements IRouterCamelContext { @@ -79,8 +97,11 @@ public class RouterCamelContext implements IRouterCamelContext { private Integer configsRefreshInterval = 1000; private ScheduledFuture<?> scheduledFuture; + /** Event topic fired when a router configuration or route is removed (reserved for integrations). */ public static String EVENT_ID_REMOVE = "org.apache.unomi.router.event.remove"; + /** Event topic related to import lifecycle (reserved for integrations). */ public static String EVENT_ID_IMPORT = "org.apache.unomi.router.event.import"; + /** Event topic related to export lifecycle (reserved for integrations). */ public static String EVENT_ID_EXPORT = "org.apache.unomi.router.event.export"; public void setExecHistorySize(String execHistorySize) { @@ -99,10 +120,17 @@ public class RouterCamelContext implements IRouterCamelContext { this.configSharingService = configSharingService; } + /** {@inheritDoc} */ + @Override public void setTracing(boolean tracing) { - camelContext.setTracing(true); + camelContext.setTracing(tracing); } + /** + * Initializes the scheduler, shared config properties, the Camel context, and import/export routes. + * + * @throws Exception if Camel or service setup fails + */ public void init() throws Exception { LOGGER.info("Initialize Camel Context..."); scheduler = Executors.newSingleThreadScheduledExecutor(); @@ -116,6 +144,11 @@ public class RouterCamelContext implements IRouterCamelContext { LOGGER.info("Camel Context initialized successfully."); } + /** + * Stops the configuration refresh scheduler and shuts down the Camel context (all routes and components). + * + * @throws Exception if Camel shutdown fails + */ public void destroy() throws Exception { scheduledFuture.cancel(true); if (scheduler != null) { @@ -223,6 +256,8 @@ public class RouterCamelContext implements IRouterCamelContext { camelContext.start(); } + /** {@inheritDoc} */ + @Override public void killExistingRoute(String routeId, boolean fireEvent) throws Exception { //Active routes Route route = camelContext.getRoute(routeId); @@ -234,6 +269,8 @@ public class RouterCamelContext implements IRouterCamelContext { } } + /** {@inheritDoc} */ + @Override public void updateProfileImportReaderRoute(String configId, boolean fireEvent) throws Exception { killExistingRoute(configId, false); @@ -255,6 +292,8 @@ public class RouterCamelContext implements IRouterCamelContext { } } + /** {@inheritDoc} */ + @Override public void updateProfileExportReaderRoute(String configId, boolean fireEvent) throws Exception { killExistingRoute(configId, false); @@ -275,6 +314,11 @@ public class RouterCamelContext implements IRouterCamelContext { } } + /** + * {@inheritDoc} + * <p>The concrete type is {@link org.apache.camel.CamelContext}; callers may narrow the reference safely.</p> + */ + @Override public CamelContext getCamelContext() { return camelContext; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java index d9794a8de..7f7a7e776 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java @@ -30,14 +30,47 @@ import java.util.HashMap; import java.util.Map; /** - * Created by amidani on 29/06/2017. + * A Camel processor that handles the completion of profile export routes. + * This processor updates the export configuration with execution statistics + * and manages the execution history of export operations. + * + * <p>The processor performs the following operations: + * <ul> + * <li>Records export execution statistics</li> + * <li>Updates the export configuration status</li> + * <li>Maintains execution history within configured size limits</li> + * <li>Persists updated configuration information</li> + * </ul> + * </p> + * + * @since 1.0 */ public class ExportRouteCompletionProcessor implements Processor { private static final Logger LOGGER = LoggerFactory.getLogger(ExportRouteCompletionProcessor.class.getName()); + + /** Service for managing export configurations */ private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService; + + /** Maximum number of execution history entries to maintain */ private int executionsHistorySize; + /** + * Processes the completion of an export route by updating its configuration and statistics. + * + * <p>This method: + * <ul> + * <li>Loads the current export configuration</li> + * <li>Creates an execution entry with timestamp and statistics</li> + * <li>Updates the configuration with execution results</li> + * <li>Maintains the execution history size limit</li> + * <li>Updates the export status to complete</li> + * </ul> + * </p> + * + * @param exchange the Camel exchange containing export execution details + * @throws Exception if an error occurs during processing + */ @Override public void process(Exchange exchange) throws Exception { // We load the conf from ES because we are going to increment the execution number @@ -59,10 +92,20 @@ public class ExportRouteCompletionProcessor implements Processor { LOGGER.info("Processing route {} completed.", exchange.getFromRouteId()); } + /** + * Sets the service used for managing export configurations. + * + * @param exportConfigurationService the service for handling export configurations + */ public void setExportConfigurationService(ImportExportConfigurationService<ExportConfiguration> exportConfigurationService) { this.exportConfigurationService = exportConfigurationService; } + /** + * Sets the maximum size of the execution history to maintain. + * + * @param executionsHistorySize the maximum number of execution entries to keep + */ public void setExecutionsHistorySize(int executionsHistorySize) { this.executionsHistorySize = executionsHistorySize; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java index 61c6ed408..39e5a42d9 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java @@ -26,14 +26,48 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Created by amidani on 22/05/2017. + * A Camel processor that retrieves import configurations based on file names. + * This processor extracts the configuration ID from the filename and loads + * the corresponding import configuration for processing. + * + * <p>The processor expects filenames in the format: + * <pre>configurationId.extension</pre> + * where the configurationId matches an existing import configuration.</p> + * + * <p>Features: + * <ul> + * <li>Extracts configuration ID from filename</li> + * <li>Loads corresponding import configuration</li> + * <li>Sets configuration in exchange header for processing</li> + * <li>Handles missing configurations gracefully</li> + * </ul> + * </p> + * + * @since 1.0 */ public class ImportConfigByFileNameProcessor implements Processor { private static final Logger LOGGER = LoggerFactory.getLogger(ImportConfigByFileNameProcessor.class.getName()); + /** Service for managing import configurations */ private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; + /** + * Processes the exchange by loading an import configuration based on the filename. + * + * <p>This method: + * <ul> + * <li>Extracts the filename from the exchange body</li> + * <li>Parses the configuration ID from the filename</li> + * <li>Attempts to load the corresponding import configuration</li> + * <li>Sets the configuration in the exchange header if found</li> + * <li>Stops route processing if no configuration is found</li> + * </ul> + * </p> + * + * @param exchange the Camel exchange containing the file to process + * @throws Exception if an error occurs during processing + */ @Override public void process(Exchange exchange) throws Exception { @@ -49,6 +83,11 @@ public class ImportConfigByFileNameProcessor implements Processor { } } + /** + * Sets the service used for managing import configurations. + * + * @param importConfigurationService the service for handling import configurations + */ public void setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration> importConfigurationService) { this.importConfigurationService = importConfigurationService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java index 7554ab3fe..76e82e0d8 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java @@ -26,15 +26,51 @@ import org.slf4j.LoggerFactory; import java.util.*; /** - * Created by amidani on 14/06/2017. + * A Camel processor that handles the completion of profile import routes. + * This processor manages the final stage of import operations, collecting statistics, + * handling errors, and updating the import configuration with execution results. + * + * <p>The processor performs the following operations: + * <ul> + * <li>Collects import statistics (success, failure, ignore counts)</li> + * <li>Manages error reporting with configurable limits</li> + * <li>Updates import configuration status</li> + * <li>Maintains execution history</li> + * <li>Handles both one-shot and recurring imports</li> + * </ul> + * </p> + * + * @since 1.0 */ public class ImportRouteCompletionProcessor implements Processor { private static final Logger LOGGER = LoggerFactory.getLogger(ImportRouteCompletionProcessor.class.getName()); + + /** Service for managing import configurations */ private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; + + /** Maximum number of execution history entries to maintain */ private int executionsHistorySize; + + /** Maximum number of errors to report per execution */ private int execErrReportSize; + /** + * Processes the completion of an import route by collecting statistics and updating configuration. + * + * <p>This method: + * <ul> + * <li>Identifies the import configuration (one-shot or recurring)</li> + * <li>Counts successful, failed, and ignored imports</li> + * <li>Collects error information up to the configured limit</li> + * <li>Updates the import configuration with execution results</li> + * <li>Sets the final status based on success/failure counts</li> + * </ul> + * </p> + * + * @param exchange the Camel exchange containing import results + * @throws Exception if an error occurs during processing + */ @Override public void process(Exchange exchange) throws Exception { String importConfigId = null; @@ -89,14 +125,29 @@ public class ImportRouteCompletionProcessor implements Processor { LOGGER.info("Processing route {} completed. completion date: {}.", exchange.getFromRouteId(), new Date()); } + /** + * Sets the service used for managing import configurations. + * + * @param importConfigurationService the service for handling import configurations + */ public void setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration> importConfigurationService) { this.importConfigurationService = importConfigurationService; } + /** + * Sets the maximum size of the execution history to maintain. + * + * @param executionsHistorySize the maximum number of execution entries to keep + */ public void setExecutionsHistorySize(int executionsHistorySize) { this.executionsHistorySize = executionsHistorySize; } + /** + * Sets the maximum number of errors to report per execution. + * + * @param execErrReportSize the maximum number of errors to store per execution + */ public void setExecErrReportSize(int execErrReportSize) { this.execErrReportSize = execErrReportSize; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java index ecfab189f..f1748cc0a 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java @@ -23,16 +23,43 @@ import org.apache.unomi.router.api.ExportConfiguration; import org.apache.unomi.router.api.services.ProfileExportService; /** - * Created by amidani on 28/06/2017. + * A Camel processor that converts Unomi Profile objects into CSV lines for export. + * This processor is responsible for transforming profile data into a formatted string + * according to the export configuration specified in the exchange header. + * + * <p>The processor works in conjunction with the ProfileExportService to perform + * the actual conversion of profile data to CSV format.</p> + * + * @since 1.0 */ public class LineBuildProcessor implements Processor { private ProfileExportService profileExportService; + /** + * Constructs a new LineBuildProcessor with the specified ProfileExportService. + * + * @param profileExportService the service responsible for converting profiles to CSV format + */ public LineBuildProcessor(ProfileExportService profileExportService) { this.profileExportService = profileExportService; } + /** + * Processes the exchange by converting a Profile object into a CSV line. + * + * <p>This method: + * <ul> + * <li>Extracts the export configuration from the exchange header</li> + * <li>Gets the Profile object from the exchange body</li> + * <li>Converts the profile to a CSV line using the ProfileExportService</li> + * <li>Sets the resulting string as the new exchange body</li> + * </ul> + * </p> + * + * @param exchange the Camel exchange containing the Profile to convert and export configuration + * @throws Exception if an error occurs during processing + */ @Override public void process(Exchange exchange) throws Exception { ExportConfiguration exportConfiguration = (ExportConfiguration) exchange.getIn().getHeader("exportConfig"); diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java index 3cf8ff6b6..2f4a5950d 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java @@ -25,12 +25,47 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Created by amidani on 14/06/2017. + * A Camel processor that handles failures during the line splitting process of data import. + * This processor is responsible for creating structured error reports when lines fail to process, + * providing detailed information about the nature of the failure and the problematic data. + * + * <p>The handler processes different types of exceptions: + * <ul> + * <li>BadProfileDataFormatException - for data format related errors</li> + * <li>General exceptions - capturing the root cause message</li> + * </ul> + * </p> + * + * <p>For each failure, it creates an ImportLineError object containing: + * <ul> + * <li>The error code or message</li> + * <li>The content of the failed line</li> + * <li>The line number in the source file</li> + * </ul> + * </p> + * + * @since 1.0 */ public class LineSplitFailureHandler implements Processor { private static final Logger LOGGER = LoggerFactory.getLogger(LineSplitFailureHandler.class.getName()); + /** + * Processes failures that occur during line splitting and creates structured error reports. + * + * <p>This method: + * <ul> + * <li>Logs the failure details including the route ID and exception</li> + * <li>Creates an ImportLineError object with detailed error information</li> + * <li>Extracts the appropriate error message based on the exception type</li> + * <li>Sets the failure information in the exchange for further processing</li> + * </ul> + * </p> + * + * @param exchange the Camel exchange containing the failed message and exception details + * @throws Exception if an error occurs during failure handling + */ + @Override public void process(Exchange exchange) throws Exception { LOGGER.error("Route: {}, Error: {}", exchange.getProperty(Exchange.FAILURE_ROUTE_ID), exchange.getProperty(Exchange.EXCEPTION_CAUGHT)); ImportLineError importLineError = new ImportLineError(); diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java index 265fec8f5..e93d55637 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java @@ -35,25 +35,77 @@ import org.slf4j.LoggerFactory; import java.util.*; /** - * Created by amidani on 29/12/2016. + * A Camel processor that splits and processes CSV lines into ProfileToImport objects. + * This processor handles the conversion of CSV data into structured profile data according + * to the import configuration, supporting various data types and multi-value fields. + * + * <p>Features include: + * <ul> + * <li>CSV parsing using RFC4180 standard</li> + * <li>Support for header rows</li> + * <li>Field mapping to profile properties</li> + * <li>Multi-value field handling</li> + * <li>Type conversion based on property definitions</li> + * <li>Profile merging configuration</li> + * <li>Delete operation support</li> + * </ul> + * </p> + * + * @since 1.0 */ public class LineSplitProcessor implements Processor { private static final Logger LOGGER = LoggerFactory.getLogger(LineSplitProcessor.class.getName()); + /** Maps field names to their corresponding column indices */ private Map<String, Integer> fieldsMapping; + + /** List of properties that should be overwritten during import */ private List<String> propertiesToOverwrite; + + /** Property used for merging profiles */ private String mergingProperty; + + /** Whether to overwrite existing profiles during import */ private boolean overwriteExistingProfiles; + + /** Whether the CSV file contains a header row */ private boolean hasHeader; + + /** Whether the CSV file contains a column for delete operations */ private boolean hasDeleteColumn; + + /** Character used to separate columns in the CSV */ private String columnSeparator; + /** Character used to separate multiple values within a field */ private String multiValueSeparator; + + /** Characters used to delimit multi-value fields */ private String multiValueDelimiter; + /** Collection of property types used for type conversion */ private Collection<PropertyType> profilePropertyTypes; + /** + * Processes a single line from a CSV file and converts it into a ProfileToImport object. + * + * <p>The method performs the following operations: + * <ul> + * <li>Handles one-shot import configurations if present</li> + * <li>Skips header row if configured</li> + * <li>Parses CSV line using RFC4180 standard</li> + * <li>Validates field mapping against data</li> + * <li>Converts fields according to their property types</li> + * <li>Handles multi-value fields</li> + * <li>Sets up profile merging configuration</li> + * <li>Processes delete operations if configured</li> + * </ul> + * </p> + * + * @param exchange the Camel exchange containing the CSV line to process + * @throws Exception if an error occurs during processing, including BadProfileDataFormatException + */ @Override public void process(Exchange exchange) throws Exception { diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java index 94737b50f..3caadc878 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java @@ -28,13 +28,43 @@ import java.util.Map; import java.util.Set; /** - * Created by amidani on 29/12/2016. + * A Camel processor that handles the storage of imported profiles in the Unomi system. + * This processor is responsible for managing the final stage of profile import, including + * segment calculation and profile persistence. + * + * <p>The processor performs the following operations: + * <ul> + * <li>Processes profiles marked for import</li> + * <li>Calculates segments and scores for non-deleted profiles</li> + * <li>Updates profile information with calculated segments</li> + * <li>Persists profiles in the Unomi storage system</li> + * </ul> + * </p> + * + * @since 1.0 */ public class UnomiStorageProcessor implements Processor { + /** Service for handling profile import operations */ private ProfileImportService profileImportService; + + /** Service for managing profile segments and scoring */ private SegmentService segmentService; + /** + * Processes the exchange by storing or updating the profile in Unomi's storage system. + * + * <p>This method: + * <ul> + * <li>Extracts the ProfileToImport from the message body</li> + * <li>For non-delete operations, calculates and updates segments and scores</li> + * <li>Persists the profile using the ProfileImportService</li> + * </ul> + * </p> + * + * @param exchange the Camel exchange containing the profile to process + * @throws Exception if an error occurs during processing + */ @Override public void process(Exchange exchange) throws Exception { @@ -59,10 +89,20 @@ public class UnomiStorageProcessor implements Processor { } } + /** + * Sets the profile import service used for persisting profiles. + * + * @param profileImportService the service responsible for profile import operations + */ public void setProfileImportService(ProfileImportService profileImportService) { this.profileImportService = profileImportService; } + /** + * Sets the segment service used for calculating profile segments and scores. + * + * @param segmentService the service responsible for segment calculations + */ public void setSegmentService(SegmentService segmentService) { this.segmentService = segmentService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java index 5529c109b..3c690a36e 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java @@ -31,19 +31,58 @@ import java.util.List; import java.util.Map; /** - * Created by amidani on 27/06/2017. + * A Camel route builder that handles the collection of profiles for export. + * This route builder creates routes that periodically collect profiles based on + * segment criteria and prepare them for export processing. + * + * <p>Features: + * <ul> + * <li>Timer-based profile collection</li> + * <li>Segment-based profile filtering</li> + * <li>Support for multiple export configurations</li> + * <li>Configurable collection intervals</li> + * <li>Security through endpoint allowlist</li> + * <li>Support for both Kafka and direct endpoints</li> + * </ul> + * </p> + * + * @since 1.0 */ public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileExportCollectRouteBuilder.class); + /** List of export configurations to process */ private List<ExportConfiguration> exportConfigurationList; + + /** Service for persisting and retrieving data */ private PersistenceService persistenceService; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType the type of configuration (kafka/direct) + */ public ProfileExportCollectRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); } + /** + * Configures the routes for collecting profiles to export. + * Creates a route for each export configuration that matches the criteria. + * + * <p>Each route: + * <ul> + * <li>Runs on a configured timer schedule</li> + * <li>Collects profiles based on segment criteria</li> + * <li>Processes profiles for export</li> + * <li>Routes data to appropriate endpoints</li> + * </ul> + * </p> + * + * @throws Exception if an error occurs during route configuration + */ @Override public void configure() throws Exception { if (exportConfigurationList == null || exportConfigurationList.isEmpty()) { @@ -93,10 +132,20 @@ public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder } } + /** + * Sets the list of export configurations to process. + * + * @param exportConfigurationList list of export configurations + */ public void setExportConfigurationList(List<ExportConfiguration> exportConfigurationList) { this.exportConfigurationList = exportConfigurationList; } + /** + * Sets the persistence service for data operations. + * + * @param persistenceService service for persisting and retrieving data + */ public void setPersistenceService(PersistenceService persistenceService) { this.persistenceService = persistenceService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java index e11378590..cc903e84a 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java @@ -29,24 +29,67 @@ import org.slf4j.LoggerFactory; import java.util.Map; /** - * Created by amidani on 28/06/2017. + * A Camel route builder that handles the production of export data from collected profiles. + * This route builder creates routes that process collected profiles and formats them + * for export to the configured destination. + * + * <p>Features: + * <ul> + * <li>Profile data transformation to export format</li> + * <li>Line-by-line processing with aggregation</li> + * <li>Support for multiple export destinations</li> + * <li>Completion handling and status updates</li> + * <li>Support for both Kafka and direct endpoints</li> + * </ul> + * </p> + * + * @since 1.0 */ public class ProfileExportProducerRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileExportProducerRouteBuilder.class); + /** Processor for handling export completion */ private ExportRouteCompletionProcessor exportRouteCompletionProcessor; + /** Service for profile export operations */ private ProfileExportService profileExportService; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType the type of configuration (kafka/direct) + */ public ProfileExportProducerRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); } + /** + * Sets the profile export service. + * + * @param profileExportService service for handling profile exports + */ public void setProfileExportService(ProfileExportService profileExportService) { this.profileExportService = profileExportService; } + /** + * Configures the routes for producing export data. + * Creates a route that processes collected profiles and prepares them for export. + * + * <p>The route: + * <ul> + * <li>Unmarshals incoming profile data</li> + * <li>Processes profiles into export format</li> + * <li>Aggregates lines for batch processing</li> + * <li>Handles export completion</li> + * <li>Routes data to configured destinations</li> + * </ul> + * </p> + * + * @throws Exception if an error occurs during route configuration + */ @Override public void configure() throws Exception { @@ -69,6 +112,11 @@ public class ProfileExportProducerRouteBuilder extends RouterAbstractRouteBuilde } + /** + * Sets the processor for handling export completion. + * + * @param exportRouteCompletionProcessor processor for export completion handling + */ public void setExportRouteCompletionProcessor(ExportRouteCompletionProcessor exportRouteCompletionProcessor) { this.exportRouteCompletionProcessor = exportRouteCompletionProcessor; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java index 1ebc5c388..faff0cfb5 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java @@ -37,20 +37,59 @@ import java.util.List; import java.util.Map; /** - * Created by amidani on 26/04/2017. + * A Camel route builder that handles the import of profiles from configured sources. + * This route builder creates routes that process incoming profile data from various + * sources and prepares it for import into Unomi. + * + * <p>Features: + * <ul> + * <li>Support for multiple import configurations</li> + * <li>Line-by-line processing of import data</li> + * <li>Error handling and failure reporting</li> + * <li>Configuration validation and status updates</li> + * <li>Support for both Kafka and direct endpoints</li> + * <li>Graceful shutdown handling</li> + * </ul> + * </p> + * + * @since 1.0 */ - public class ProfileImportFromSourceRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName()); + /** List of import configurations to process */ private List<ImportConfiguration> importConfigurationList; + + /** Service for managing import configurations */ private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType the type of configuration (kafka/direct) + */ public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); } + /** + * Configures the routes for importing profiles from sources. + * Creates routes for each import configuration and sets up error handling. + * + * <p>The routes: + * <ul> + * <li>Handle data validation and format errors</li> + * <li>Process data line by line</li> + * <li>Update import status and progress</li> + * <li>Route processed data to appropriate endpoints</li> + * <li>Manage graceful completion of imports</li> + * </ul> + * </p> + * + * @throws Exception if an error occurs during route configuration + */ @Override public void configure() throws Exception { @@ -132,10 +171,20 @@ public class ProfileImportFromSourceRouteBuilder extends RouterAbstractRouteBuil } } + /** + * Sets the list of import configurations to process. + * + * @param importConfigurationList list of import configurations + */ public void setImportConfigurationList(List<ImportConfiguration> importConfigurationList) { this.importConfigurationList = importConfigurationList; } + /** + * Sets the service for managing import configurations. + * + * @param importConfigurationService service for handling import configurations + */ public void setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration> importConfigurationService) { this.importConfigurationService = importConfigurationService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java index 863437032..6ac223f3d 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java @@ -30,18 +30,59 @@ import org.slf4j.LoggerFactory; import java.util.Map; /** - * Created by amidani on 22/05/2017. + * A Camel route builder that handles one-time profile imports from files. + * This route builder creates routes that process CSV files dropped into a + * monitored directory for one-time import operations. + * + * <p>Features: + * <ul> + * <li>File-based import processing</li> + * <li>Configuration lookup from filename</li> + * <li>CSV file processing with error handling</li> + * <li>Support for both Kafka and direct endpoints</li> + * <li>Automatic file movement after processing</li> + * <li>Error reporting and failed file handling</li> + * </ul> + * </p> + * + * @since 1.0 */ public class ProfileImportOneShotRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName()); + + /** Processor for extracting import configuration from filenames */ private ImportConfigByFileNameProcessor importConfigByFileNameProcessor; + + /** Directory to monitor for import files */ private String uploadDir; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType the type of configuration (kafka/direct) + */ public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); } + /** + * Configures the route for one-shot profile imports. + * Creates a route that monitors a directory for CSV files and processes them for import. + * + * <p>The route: + * <ul> + * <li>Monitors upload directory for CSV files</li> + * <li>Extracts configuration from filename</li> + * <li>Processes file contents line by line</li> + * <li>Handles validation and format errors</li> + * <li>Routes processed data to appropriate endpoints</li> + * </ul> + * </p> + * + * @throws Exception if an error occurs during route configuration + */ @Override public void configure() throws Exception { @@ -81,10 +122,20 @@ public class ProfileImportOneShotRouteBuilder extends RouterAbstractRouteBuilder } } + /** + * Sets the processor for handling import configuration by filename. + * + * @param importConfigByFileNameProcessor processor for filename-based configuration + */ public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) { this.importConfigByFileNameProcessor = importConfigByFileNameProcessor; } + /** + * Sets the directory to monitor for import files. + * + * @param uploadDir path to the directory to monitor + */ public void setUploadDir(String uploadDir) { this.uploadDir = uploadDir; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java index ff4942c75..44bebc149 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java @@ -29,19 +29,56 @@ import org.slf4j.LoggerFactory; import java.util.Map; /** - * Created by amidani on 26/04/2017. + * A Camel route builder that handles the final stage of profile imports by storing + * processed profile data into Apache Unomi's storage system. + * + * <p>Features: + * <ul> + * <li>Final processing of imported profiles</li> + * <li>Integration with Unomi's storage system</li> + * <li>Support for both Kafka and direct endpoints</li> + * <li>Import completion handling</li> + * <li>Error handling and reporting</li> + * </ul> + * </p> + * + * @since 1.0 */ public class ProfileImportToUnomiRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName()); + /** Processor for storing profiles in Unomi */ private UnomiStorageProcessor unomiStorageProcessor; + + /** Processor for handling import completion */ private ImportRouteCompletionProcessor importRouteCompletionProcessor; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType the type of configuration (kafka/direct) + */ public ProfileImportToUnomiRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); } + /** + * Configures the route for storing imported profiles in Unomi. + * Creates a route that processes incoming profile data and stores it in Unomi's storage system. + * + * <p>The route: + * <ul> + * <li>Receives processed profile data</li> + * <li>Stores profiles in Unomi's storage system</li> + * <li>Handles import completion</li> + * <li>Manages error reporting</li> + * </ul> + * </p> + * + * @throws Exception if an error occurs during route configuration + */ @Override public void configure() throws Exception { @@ -67,10 +104,20 @@ public class ProfileImportToUnomiRouteBuilder extends RouterAbstractRouteBuilder .to("log:org.apache.unomi.router?level=DEBUG"); } + /** + * Sets the processor for storing profiles in Unomi. + * + * @param unomiStorageProcessor processor for Unomi storage operations + */ public void setUnomiStorageProcessor(UnomiStorageProcessor unomiStorageProcessor) { this.unomiStorageProcessor = unomiStorageProcessor; } + /** + * Sets the processor for handling import completion. + * + * @param importRouteCompletionProcessor processor for import completion operations + */ public void setImportRouteCompletionProcessor(ImportRouteCompletionProcessor importRouteCompletionProcessor) { this.importRouteCompletionProcessor = importRouteCompletionProcessor; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java index ad06a00ec..5ca80bfff 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java @@ -28,26 +28,66 @@ import org.apache.unomi.router.api.RouterConstants; import java.util.Map; /** - * Created by amidani on 13/06/2017. + * Abstract base class for all Unomi router route builders. + * This class provides common functionality and configuration for both import + * and export routes, supporting both Kafka and direct endpoint configurations. + * + * <p>Features: + * <ul> + * <li>Common Kafka configuration handling</li> + * <li>Endpoint URI generation for both Kafka and direct modes</li> + * <li>Shared configuration for JSON data format</li> + * <li>Profile service integration</li> + * <li>Endpoint security through allowlist</li> + * </ul> + * </p> + * + * @since 1.0 */ public abstract class RouterAbstractRouteBuilder extends RouteBuilder { + /** JSON data format configuration */ protected JacksonDataFormat jacksonDataFormat; + /** Kafka broker host */ protected String kafkaHost; + + /** Kafka broker port */ protected String kafkaPort; + + /** Topic for import operations */ protected String kafkaImportTopic; + + /** Topic for export operations */ protected String kafkaExportTopic; + + /** Consumer group ID for import operations */ protected String kafkaImportGroupId; + + /** Consumer group ID for export operations */ protected String kafkaExportGroupId; + + /** Number of Kafka consumers */ protected String kafkaConsumerCount; + + /** Auto-commit configuration for Kafka */ protected String kafkaAutoCommit; + /** Configuration type (kafka/direct) */ protected String configType; + + /** List of allowed endpoint schemes */ protected String allowedEndpoints; + /** Service for profile operations */ protected ProfileService profileService; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType the type of configuration (kafka/direct) + */ public RouterAbstractRouteBuilder(Map<String, String> kafkaProps, String configType) { this.kafkaHost = kafkaProps.get("kafkaHost"); this.kafkaPort = kafkaProps.get("kafkaPort"); @@ -60,6 +100,21 @@ public abstract class RouterAbstractRouteBuilder extends RouteBuilder { this.configType = configType; } + /** + * Gets the appropriate endpoint URI based on configuration type and operation. + * + * <p>This method: + * <ul> + * <li>Creates Kafka endpoints with appropriate configuration when using Kafka</li> + * <li>Returns direct endpoint URIs when not using Kafka</li> + * <li>Configures consumer properties for incoming endpoints</li> + * </ul> + * </p> + * + * @param direction the direction of the endpoint (to/from) + * @param operationDepositBuffer the operation buffer identifier + * @return Object either a KafkaEndpoint or String depending on configuration + */ public Object getEndpointURI(String direction, String operationDepositBuffer) { Object endpoint; if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { @@ -91,14 +146,29 @@ public abstract class RouterAbstractRouteBuilder extends RouteBuilder { return endpoint; } + /** + * Sets the JSON data format configuration. + * + * @param jacksonDataFormat the JSON data format to use + */ public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { this.jacksonDataFormat = jacksonDataFormat; } + /** + * Sets the list of allowed endpoint schemes. + * + * @param allowedEndpoints comma-separated list of allowed endpoint schemes + */ public void setAllowedEndpoints(String allowedEndpoints) { this.allowedEndpoints = allowedEndpoints; } + /** + * Sets the profile service. + * + * @param profileService the service for profile operations + */ public void setProfileService(ProfileService profileService) { this.profileService = profileService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java index ca87ad3bd..c113e3aeb 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java @@ -22,11 +22,40 @@ import org.apache.camel.processor.aggregate.AggregationStrategy; import java.util.ArrayList; /** - * Created by amidani on 16/06/2017. + * An implementation of Camel's AggregationStrategy that aggregates exchange bodies into an ArrayList. + * This strategy is useful when you need to collect multiple messages into a single list for batch processing + * or grouped operations within the Unomi Router. + * + * <p>The strategy maintains the following behavior: + * <ul> + * <li>For the first message (when oldExchange is null), it creates a new ArrayList and adds the message body to it</li> + * <li>For subsequent messages, it adds the new message body to the existing ArrayList</li> + * </ul> + * </p> + * + * <p>The ArrayList is maintained in the exchange body, allowing for easy access to all aggregated items + * once the aggregation is complete.</p> + * + * @since 1.0 */ public class ArrayListAggregationStrategy implements AggregationStrategy { - + /** + * Aggregates exchange messages by collecting their bodies into an ArrayList. + * + * <p>This method implements the core aggregation logic where: + * <ul> + * <li>The new exchange's body is extracted as is (maintaining its original type)</li> + * <li>If this is the first message, a new ArrayList is created to store the messages</li> + * <li>The new body is added to the ArrayList</li> + * <li>The ArrayList is maintained in the exchange body for subsequent aggregations</li> + * </ul> + * </p> + * + * @param oldExchange the previous exchange being aggregated (may be null on first invocation) + * @param newExchange the current exchange being aggregated (contains the new item to add to the list) + * @return the aggregated exchange containing the ArrayList of all aggregated items + */ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object newBody = newExchange.getIn().getBody(); ArrayList<Object> list = null; diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java index d01859f22..8ccabe687 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java @@ -22,10 +22,40 @@ import org.apache.unomi.router.api.ExportConfiguration; import org.apache.unomi.router.api.RouterUtils; /** - * Created by amidani on 29/06/2017. + * An implementation of Camel's AggregationStrategy that combines multiple text lines into a single string + * for export purposes. This strategy is specifically designed to work with the Unomi Router's export functionality, + * where multiple data lines need to be aggregated into a single export file. + * + * <p>The strategy maintains the following behavior: + * <ul> + * <li>For the first message (when oldExchange is null), it simply returns the new exchange</li> + * <li>For subsequent messages, it appends the new content to the existing content using the configured line separator</li> + * </ul> + * </p> + * + * <p>The line separator used for aggregation is obtained from the ExportConfiguration object + * stored in the exchange header under the key "exportConfig".</p> + * + * @since 1.0 */ public class StringLinesAggregationStrategy implements AggregationStrategy { + /** + * Aggregates two exchanges by combining their body content with appropriate line separation. + * + * <p>This method implements the core aggregation logic where: + * <ul> + * <li>The new exchange's body is extracted as a String</li> + * <li>The line separator is obtained from the export configuration in the exchange header</li> + * <li>If there's an old exchange, the new content is appended to it with the line separator</li> + * <li>If there's no old exchange, the new exchange is returned as is</li> + * </ul> + * </p> + * + * @param oldExchange the previous exchange being aggregated (may be null on first invocation) + * @param newExchange the current exchange being aggregated (contains the new line to append) + * @return the aggregated exchange containing the combined content + */ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object newBody = newExchange.getIn().getBody(String.class); String lineSeparator = newExchange.getIn().getHeader("exportConfig", ExportConfiguration.class).getLineSeparator();
