This is an automated email from the ASF dual-hosted git repository. shuber pushed a commit to branch unomi-3-dev in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 80babbd9bb341ad62ef492f0e6b8ff16dadfebaa Merge: 8347976fc 15ae3eaca Author: Serge Huber <[email protected]> AuthorDate: Fri Jan 9 20:37:49 2026 +0100 Merge master into unomi-3-dev Merged latest changes from origin/master including: - [UNOMI-924] Add healthcheck activated by default (#748) - [UNOMI-922] Inconsistency between value for nbOfVisits in Profile and number of Sessions (#744) - chore: github settings to only enable squash commits (#747) Resolved conflicts in: - pom.xml - itests/pom.xml - itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xToCurrentVersionIT.java - services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java .asf.yaml | 7 ++ docker/src/main/docker/docker-compose-cluster.yml | 2 +- .../org.apache.unomi.healthcheck-elasticsearch.cfg | 2 +- .../org.apache.unomi.healthcheck-opensearch.cfg | 2 +- itests/README.md | 18 +-- .../org/apache/unomi/itests/ProfileServiceIT.java | 71 +++++++++++- .../migration/Migrate16xToCurrentVersionIT.java | 59 +++++++--- .../resources/migration/snapshots_repository.zip | Bin 3252696 -> 3294102 bytes manual/src/main/asciidoc/configuration.adoc | 9 ++ manual/src/main/asciidoc/datamodel.adoc | 2 + .../resources/META-INF/cxs/mappings/profile.json | 3 + .../resources/META-INF/cxs/mappings/profile.json | 3 + .../resources/META-INF/cxs/expressions/mvel.json | 3 +- .../META-INF/cxs/rules/sessionAssigned.json | 7 ++ pom.xml | 4 +- .../services/impl/profiles/ProfileServiceImpl.java | 122 +++++++++++++++++++++ .../META-INF/cxs/painless/decNbOfVisits.painless | 30 +++++ .../cxs/properties/profiles/system/nbOfVisits.json | 2 +- .../{nbOfVisits.json => totalNbOfVisits.json} | 8 +- services/src/main/resources/messages_de.properties | 1 + services/src/main/resources/messages_en.properties | 1 + .../migrate-3.1.0-00-fixProfileNbOfVisits.groovy | 98 +++++++++++++++++ .../copy_nbOfVisits_to_totalNbOfVisits.painless | 25 +++++ .../3.1.0/count_sessions_by_profile.json | 19 ++++ .../3.1.0/profile_copy_nbOfVisits_request.json | 25 +++++ .../requestBody/3.1.0/profile_scroll_query.json | 23 ++++ 26 files changed, 510 insertions(+), 36 deletions(-) diff --cc itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xToCurrentVersionIT.java index 830246aa8,b60265582..3edbd9e35 --- a/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xToCurrentVersionIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xToCurrentVersionIT.java @@@ -52,55 -47,10 +52,55 @@@ public class Migrate16xToCurrentVersion "context-userlist", "context-propertytype", "context-scope", "context-conditiontype", "context-rule", "context-scoring", "context-segment", "context-groovyaction", "context-topic", "context-patch", "context-jsonschema", "context-importconfig", "context-exportconfig", "context-rulestats"); - public void checkSearchEngine() { - searchEngine = System.getProperty(SEARCH_ENGINE_PROPERTY, SEARCH_ENGINE_ELASTICSEARCH); - System.out.println("Check search engine: " + searchEngine); + // Elasticsearch connection constants + private static String getEsBaseUrl() { + return "http://localhost:" + getSearchPort(); } + private static String getEsSnapshotRepo() { + return getEsBaseUrl() + "/_snapshot/snapshots_repository/"; + } + private static String getEsSnapshotStatus() { + return getEsBaseUrl() + "/_snapshot/_status"; + } - private static final String ES_SNAPSHOT_2 = "snapshot_2"; ++ private static final String ES_SNAPSHOT_3 = "snapshot_3"; + private static String getEsSnapshotRestoreUrl() { - return getEsSnapshotRepo() + ES_SNAPSHOT_2 + "/_restore?wait_for_completion=true"; ++ return getEsSnapshotRepo() + ES_SNAPSHOT_3 + "/_restore?wait_for_completion=true"; + } + + // Index prefix constants + private static final String INDEX_PREFIX_CONTEXT = "context-"; + private static final String INDEX_EVENT = INDEX_PREFIX_CONTEXT + "event-"; + private static final String INDEX_SESSION = INDEX_PREFIX_CONTEXT + "session-"; + private static final String INDEX_SYSTEMITEMS = INDEX_PREFIX_CONTEXT + "systemitems"; + private static final String INDEX_PROFILE = INDEX_PREFIX_CONTEXT + "profile"; + + // Resource path constants + private static final String RESOURCE_MIGRATION = "migration/"; + private static final String RESOURCE_CREATE_SNAPSHOTS_REPO = RESOURCE_MIGRATION + "create_snapshots_repository.json"; + private static final String RESOURCE_MUST_NOT_MATCH_EVENTTYPE = RESOURCE_MIGRATION + "must_not_match_some_eventype_body.json"; + private static final String RESOURCE_MATCH_ALL_LOGIN_EVENT = RESOURCE_MIGRATION + "match_all_login_event_request.json"; + + // Scope constants + private static final String SCOPE_SYSTEMSITE = "systemsite"; + private static final String SCOPE_DIGITALL = "digitall"; + + // Event type constants + private static final String EVENT_TYPE_FORM = "form"; + private static final String EVENT_TYPE_VIEW = "view"; + private static final String EVENT_TYPE_UPDATE_PROPERTIES = "updateProperties"; + private static final String EVENT_TYPE_SESSION_CREATED = "sessionCreated"; + + // Profile constants + private static final String PROFILE_FIRST_NAME = "firstName"; + private static final String PROFILE_INTERESTS = "interests"; + private static final String PROFILE_PAST_EVENTS = "pastEvents"; + + // System item types + private static final List<String> SYSTEM_ITEM_TYPES = Arrays.asList("segment", "rule", "scope"); + + // Migration command + private static final String MIGRATION_COMMAND = "unomi:migrate 1.6.0 true"; + private static final long MIGRATION_TIMEOUT = 900000L; @Override @Before @@@ -121,10 -69,10 +121,10 @@@ // Restore snapshot from 1.6.x try (CloseableHttpClient httpClient = HttpUtils.initHttpClient(true, null)) { // Create snapshot repo - HttpUtils.executePutRequest(httpClient, "http://localhost:9400/_snapshot/snapshots_repository/", resourceAsString("migration/create_snapshots_repository.json"), null); + HttpUtils.executePutRequest(httpClient, getEsSnapshotRepo(), resourceAsString(RESOURCE_CREATE_SNAPSHOTS_REPO), null); // Get snapshot, insure it exists - String snapshot = HttpUtils.executeGetRequest(httpClient, getEsSnapshotRepo() + ES_SNAPSHOT_2, null); - if (snapshot == null || !snapshot.contains(ES_SNAPSHOT_2)) { - String snapshot = HttpUtils.executeGetRequest(httpClient, "http://localhost:9400/_snapshot/snapshots_repository/snapshot_3", null); - if (snapshot == null || !snapshot.contains("snapshot_3")) { ++ String snapshot = HttpUtils.executeGetRequest(httpClient, getEsSnapshotRepo() + ES_SNAPSHOT_3, null); ++ if (snapshot == null || !snapshot.contains(ES_SNAPSHOT_3)) { throw new RuntimeException("Unable to retrieve 1.6.x snapshot for ES restore"); } // Restore the snapshot diff --cc pom.xml index acc35166e,25360797d..66c14101e --- a/pom.xml +++ b/pom.xml @@@ -64,10 -64,10 +64,10 @@@ <maven.compiler.release>${java.version}</maven.compiler.release> <karaf.version>4.4.8</karaf.version> - <elasticsearch.version>9.1.3</elasticsearch.version> - <elasticsearch.test.version>9.1.3</elasticsearch.test.version> + <elasticsearch.version>9.1.4</elasticsearch.version> + <elasticsearch.test.version>9.2.1</elasticsearch.test.version> - <opensearch.version>3.0.0</opensearch.version> - <opensearch.rest.client.version>3.0.0</opensearch.rest.client.version> + <opensearch.version>3.4.0</opensearch.version> + <opensearch.rest.client.version>3.4.0</opensearch.rest.client.version> <httpclient5.version>5.2.1</httpclient5.version> <aws.sdk.version>2.28.14</aws.sdk.version> <javax-validation.version>1.1.0.Final</javax-validation.version> diff --cc services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java index 61b077375,9673a707c..cb0ad7544 --- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java @@@ -47,12 -47,128 +47,14 @@@ import java.util.stream.Collectors import static org.apache.unomi.persistence.spi.CustomObjectMapper.getObjectMapper; -public class ProfileServiceImpl implements ProfileService, SynchronousBundleListener { - - private static final String DECREMENT_NB_OF_VISITS_SCRIPT = "decNbOfVisits"; - - /** - * This class is responsible for storing property types and permits optimized access to them. - * In order to assure data consistency, thread-safety and performance, this class is immutable and every operation on - * property types requires creating a new instance (copy-on-write). - */ - private static class PropertyTypes { - private final List<PropertyType> allPropertyTypes; - private Map<String, PropertyType> propertyTypesById = new HashMap<>(); - private Map<String, List<PropertyType>> propertyTypesByTags = new HashMap<>(); - private Map<String, List<PropertyType>> propertyTypesBySystemTags = new HashMap<>(); - private Map<String, List<PropertyType>> propertyTypesByTarget = new HashMap<>(); - - public PropertyTypes(List<PropertyType> allPropertyTypes) { - this.allPropertyTypes = new ArrayList<>(allPropertyTypes); - propertyTypesById = new HashMap<>(); - propertyTypesByTags = new HashMap<>(); - propertyTypesBySystemTags = new HashMap<>(); - propertyTypesByTarget = new HashMap<>(); - for (PropertyType propertyType : allPropertyTypes) { - propertyTypesById.put(propertyType.getItemId(), propertyType); - for (String propertyTypeTag : propertyType.getMetadata().getTags()) { - updateListMap(propertyTypesByTags, propertyType, propertyTypeTag); - } - for (String propertyTypeSystemTag : propertyType.getMetadata().getSystemTags()) { - updateListMap(propertyTypesBySystemTags, propertyType, propertyTypeSystemTag); - } - updateListMap(propertyTypesByTarget, propertyType, propertyType.getTarget()); - } - } - - public List<PropertyType> getAll() { - return allPropertyTypes; - } - - public PropertyType get(String propertyId) { - return propertyTypesById.get(propertyId); - } - - public Map<String, List<PropertyType>> getAllByTarget() { - return propertyTypesByTarget; - } - - public List<PropertyType> getByTag(String tag) { - return propertyTypesByTags.get(tag); - } - - public List<PropertyType> getBySystemTag(String systemTag) { - return propertyTypesBySystemTags.get(systemTag); - } - - public List<PropertyType> getByTarget(String target) { - return propertyTypesByTarget.get(target); - } - - public PropertyTypes with(PropertyType newProperty) { - return with(Collections.singletonList(newProperty)); - } - - /** - * Creates a new instance of this class containing given property types. - * If property types with the same ID existed before, they will be replaced by the new ones. - * - * @param newProperties list of property types to change - * @return new instance - */ - public PropertyTypes with(List<PropertyType> newProperties) { - Map<String, PropertyType> updatedProperties = new HashMap<>(); - for (PropertyType property : newProperties) { - if (propertyTypesById.containsKey(property.getItemId())) { - updatedProperties.put(property.getItemId(), property); - } - } - - List<PropertyType> newPropertyTypes = Stream.concat( - allPropertyTypes.stream().map(property -> updatedProperties.getOrDefault(property.getItemId(), property)), - newProperties.stream().filter(property -> !propertyTypesById.containsKey(property.getItemId())) - ).collect(Collectors.toList()); - - return new PropertyTypes(newPropertyTypes); - } - - /** - * Creates a new instance of this class containing all property types except the one with given ID. - * - * @param propertyId ID of the property to delete - * @return new instance - */ - public PropertyTypes without(String propertyId) { - List<PropertyType> newPropertyTypes = allPropertyTypes.stream() - .filter(property -> !property.getItemId().equals(propertyId)) - .collect(Collectors.toList()); - - return new PropertyTypes(newPropertyTypes); - } - - private void updateListMap(Map<String, List<PropertyType>> listMap, PropertyType propertyType, String key) { - List<PropertyType> propertyTypes = listMap.get(key); - if (propertyTypes == null) { - propertyTypes = new ArrayList<>(); - } - propertyTypes.add(propertyType); - listMap.put(key, propertyTypes); - } - - } +public class ProfileServiceImpl extends AbstractMultiTypeCachingService implements ProfileService { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileServiceImpl.class.getName()); - private static final int NB_OF_VISITS_DECREMENT_BATCH_SIZE = 500; - private BundleContext bundleContext; - - private PersistenceService persistenceService; ++ private static final String DECREMENT_NB_OF_VISITS_SCRIPT = "decNbOfVisits"; + private DefinitionsService definitionsService; - private SchedulerService schedulerService; - private SegmentService segmentService; private Integer purgeProfileExistTime = 0; @@@ -235,62 -489,48 +334,85 @@@ } private void initializePurge() { + LOGGER.info("Purge: Initializing"); + + if (purgeProfileExistTime <= 0 && purgeProfileInactiveTime <= 0 && purgeSessionExistTime <= 0 && purgeEventExistTime <= 0) { + return; + } + + if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || purgeSessionExistTime > 0 || purgeEventExistTime > 0) { + if (purgeProfileInactiveTime > 0) { + LOGGER.info("Purge: Profile with no visits since more than {} days, will be purged", purgeProfileInactiveTime); + } + if (purgeProfileExistTime > 0) { + LOGGER.info("Purge: Profile created since more than {} days, will be purged", purgeProfileExistTime); + } + + if (purgeSessionExistTime > 0) { + LOGGER.info("Purge: Session items created since more than {} days, will be purged", purgeSessionExistTime); + } + if (purgeEventExistTime > 0) { + LOGGER.info("Purge: Event items created since more than {} days, will be purged", purgeEventExistTime); + } + - purgeTask = new TimerTask() { - @Override - public void run() { + // Register the task executor for profile purge + TaskExecutor profilePurgeExecutor = new TaskExecutor() { + @Override + public String getTaskType() { + return "profile-purge"; + } + + @Override + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { + contextManager.executeAsSystem(() -> { try { + long purgeStartTime = System.currentTimeMillis(); + LOGGER.info("Purge: triggered"); + + // Profile purge purgeProfiles(purgeProfileInactiveTime, purgeProfileExistTime); - - // Monthly items purge - purgeSessionItems(purgeSessionExistTime); - purgeEventItems(purgeEventExistTime); + if (purgeSessionExistTime > 0) { + purgeSessionItems(purgeSessionExistTime); + } + if (purgeEventExistTime > 0) { + purgeEventItems(purgeEventExistTime); + } + LOGGER.info("Purge: executed in {} ms", System.currentTimeMillis() - purgeStartTime); ++ + callback.complete(); } catch (Throwable t) { - LOGGER.error("Error while purging", t); + // During shutdown, services may be unavailable - only log if not shutting down + LOGGER.error("Error while purging profiles, sessions, or events", t); + callback.fail(t.getMessage()); } - } - }; - - schedulerService.getScheduleExecutorService().scheduleAtFixedRate(purgeTask, 1, purgeProfileInterval, TimeUnit.DAYS); + return null; + }); + } + }; - LOGGER.info("Purge: purge scheduled with an interval of {} days", purgeProfileInterval); + schedulerService.registerTaskExecutor(profilePurgeExecutor); + + // Check if a purge task already exists + List<ScheduledTask> existingTasks = schedulerService.getTasksByType("profile-purge", 0, 1, null).getList(); + if (!existingTasks.isEmpty() && existingTasks.get(0).isSystemTask()) { + // Reuse the existing task if it's a system task + purgeTask = existingTasks.get(0); + // Update task configuration if needed + purgeTask.setPeriod(purgeProfileInterval); + purgeTask.setTimeUnit(TimeUnit.DAYS); + purgeTask.setFixedRate(true); + purgeTask.setEnabled(true); + schedulerService.saveTask(purgeTask); + LOGGER.info("Reusing existing system purge task: {}", purgeTask.getItemId()); } else { - LOGGER.info("Purge: No purge scheduled"); + // Create a new task if none exists or existing one isn't a system task + purgeTask = schedulerService.newTask("profile-purge") + .withPeriod(purgeProfileInterval, TimeUnit.DAYS) + .withFixedRate() // Run at fixed intervals + // By default tasks run on a single node, no need to explicitly set it + .asSystemTask() // Mark as a system task + .schedule(); + LOGGER.info("Created new system purge task: {}", purgeTask.getItemId()); } }
