This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch unomi-3-dev
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 80babbd9bb341ad62ef492f0e6b8ff16dadfebaa
Merge: 8347976fc 15ae3eaca
Author: Serge Huber <[email protected]>
AuthorDate: Fri Jan 9 20:37:49 2026 +0100

    Merge master into unomi-3-dev
    
    Merged latest changes from origin/master including:
    - [UNOMI-924] Add healthcheck activated by default (#748)
    - [UNOMI-922] Inconsistency between value for nbOfVisits in Profile and 
number of Sessions (#744)
    - chore: github settings to only enable squash commits (#747)
    
    Resolved conflicts in:
    - pom.xml
    - itests/pom.xml
    - 
itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xToCurrentVersionIT.java
    - 
services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java

 .asf.yaml                                          |   7 ++
 docker/src/main/docker/docker-compose-cluster.yml  |   2 +-
 .../org.apache.unomi.healthcheck-elasticsearch.cfg |   2 +-
 .../org.apache.unomi.healthcheck-opensearch.cfg    |   2 +-
 itests/README.md                                   |  18 +--
 .../org/apache/unomi/itests/ProfileServiceIT.java  |  71 +++++++++++-
 .../migration/Migrate16xToCurrentVersionIT.java    |  59 +++++++---
 .../resources/migration/snapshots_repository.zip   | Bin 3252696 -> 3294102 
bytes
 manual/src/main/asciidoc/configuration.adoc        |   9 ++
 manual/src/main/asciidoc/datamodel.adoc            |   2 +
 .../resources/META-INF/cxs/mappings/profile.json   |   3 +
 .../resources/META-INF/cxs/mappings/profile.json   |   3 +
 .../resources/META-INF/cxs/expressions/mvel.json   |   3 +-
 .../META-INF/cxs/rules/sessionAssigned.json        |   7 ++
 pom.xml                                            |   4 +-
 .../services/impl/profiles/ProfileServiceImpl.java | 122 +++++++++++++++++++++
 .../META-INF/cxs/painless/decNbOfVisits.painless   |  30 +++++
 .../cxs/properties/profiles/system/nbOfVisits.json |   2 +-
 .../{nbOfVisits.json => totalNbOfVisits.json}      |   8 +-
 services/src/main/resources/messages_de.properties |   1 +
 services/src/main/resources/messages_en.properties |   1 +
 .../migrate-3.1.0-00-fixProfileNbOfVisits.groovy   |  98 +++++++++++++++++
 .../copy_nbOfVisits_to_totalNbOfVisits.painless    |  25 +++++
 .../3.1.0/count_sessions_by_profile.json           |  19 ++++
 .../3.1.0/profile_copy_nbOfVisits_request.json     |  25 +++++
 .../requestBody/3.1.0/profile_scroll_query.json    |  23 ++++
 26 files changed, 510 insertions(+), 36 deletions(-)

diff --cc 
itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xToCurrentVersionIT.java
index 830246aa8,b60265582..3edbd9e35
--- 
a/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xToCurrentVersionIT.java
+++ 
b/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xToCurrentVersionIT.java
@@@ -52,55 -47,10 +52,55 @@@ public class Migrate16xToCurrentVersion
              "context-userlist", "context-propertytype", "context-scope", 
"context-conditiontype", "context-rule", "context-scoring", "context-segment", 
"context-groovyaction", "context-topic",
              "context-patch", "context-jsonschema", "context-importconfig", 
"context-exportconfig", "context-rulestats");
  
 -    public void checkSearchEngine() {
 -        searchEngine = System.getProperty(SEARCH_ENGINE_PROPERTY, 
SEARCH_ENGINE_ELASTICSEARCH);
 -        System.out.println("Check search engine: " + searchEngine);
 +    // Elasticsearch connection constants
 +    private static String getEsBaseUrl() {
 +        return "http://localhost:"; + getSearchPort();
      }
 +    private static String getEsSnapshotRepo() {
 +        return getEsBaseUrl() + "/_snapshot/snapshots_repository/";
 +    }
 +    private static String getEsSnapshotStatus() {
 +        return getEsBaseUrl() + "/_snapshot/_status";
 +    }
-     private static final String ES_SNAPSHOT_2 = "snapshot_2";
++    private static final String ES_SNAPSHOT_3 = "snapshot_3";
 +    private static String getEsSnapshotRestoreUrl() {
-         return getEsSnapshotRepo() + ES_SNAPSHOT_2 + 
"/_restore?wait_for_completion=true";
++        return getEsSnapshotRepo() + ES_SNAPSHOT_3 + 
"/_restore?wait_for_completion=true";
 +    }
 +
 +    // Index prefix constants
 +    private static final String INDEX_PREFIX_CONTEXT = "context-";
 +    private static final String INDEX_EVENT = INDEX_PREFIX_CONTEXT + "event-";
 +    private static final String INDEX_SESSION = INDEX_PREFIX_CONTEXT + 
"session-";
 +    private static final String INDEX_SYSTEMITEMS = INDEX_PREFIX_CONTEXT + 
"systemitems";
 +    private static final String INDEX_PROFILE = INDEX_PREFIX_CONTEXT + 
"profile";
 +
 +    // Resource path constants
 +    private static final String RESOURCE_MIGRATION = "migration/";
 +    private static final String RESOURCE_CREATE_SNAPSHOTS_REPO = 
RESOURCE_MIGRATION + "create_snapshots_repository.json";
 +    private static final String RESOURCE_MUST_NOT_MATCH_EVENTTYPE = 
RESOURCE_MIGRATION + "must_not_match_some_eventype_body.json";
 +    private static final String RESOURCE_MATCH_ALL_LOGIN_EVENT = 
RESOURCE_MIGRATION + "match_all_login_event_request.json";
 +
 +    // Scope constants
 +    private static final String SCOPE_SYSTEMSITE = "systemsite";
 +    private static final String SCOPE_DIGITALL = "digitall";
 +
 +    // Event type constants
 +    private static final String EVENT_TYPE_FORM = "form";
 +    private static final String EVENT_TYPE_VIEW = "view";
 +    private static final String EVENT_TYPE_UPDATE_PROPERTIES = 
"updateProperties";
 +    private static final String EVENT_TYPE_SESSION_CREATED = "sessionCreated";
 +
 +    // Profile constants
 +    private static final String PROFILE_FIRST_NAME = "firstName";
 +    private static final String PROFILE_INTERESTS = "interests";
 +    private static final String PROFILE_PAST_EVENTS = "pastEvents";
 +
 +    // System item types
 +    private static final List<String> SYSTEM_ITEM_TYPES = 
Arrays.asList("segment", "rule", "scope");
 +
 +    // Migration command
 +    private static final String MIGRATION_COMMAND = "unomi:migrate 1.6.0 
true";
 +    private static final long MIGRATION_TIMEOUT = 900000L;
  
      @Override
      @Before
@@@ -121,10 -69,10 +121,10 @@@
          // Restore snapshot from 1.6.x
          try (CloseableHttpClient httpClient = HttpUtils.initHttpClient(true, 
null)) {
              // Create snapshot repo
 -            HttpUtils.executePutRequest(httpClient, 
"http://localhost:9400/_snapshot/snapshots_repository/";, 
resourceAsString("migration/create_snapshots_repository.json"), null);
 +            HttpUtils.executePutRequest(httpClient, getEsSnapshotRepo(), 
resourceAsString(RESOURCE_CREATE_SNAPSHOTS_REPO), null);
              // Get snapshot, insure it exists
-             String snapshot = HttpUtils.executeGetRequest(httpClient, 
getEsSnapshotRepo() + ES_SNAPSHOT_2, null);
-             if (snapshot == null || !snapshot.contains(ES_SNAPSHOT_2)) {
 -            String snapshot = HttpUtils.executeGetRequest(httpClient, 
"http://localhost:9400/_snapshot/snapshots_repository/snapshot_3";, null);
 -            if (snapshot == null || !snapshot.contains("snapshot_3")) {
++            String snapshot = HttpUtils.executeGetRequest(httpClient, 
getEsSnapshotRepo() + ES_SNAPSHOT_3, null);
++            if (snapshot == null || !snapshot.contains(ES_SNAPSHOT_3)) {
                  throw new RuntimeException("Unable to retrieve 1.6.x snapshot 
for ES restore");
              }
              // Restore the snapshot
diff --cc pom.xml
index acc35166e,25360797d..66c14101e
--- a/pom.xml
+++ b/pom.xml
@@@ -64,10 -64,10 +64,10 @@@
          <maven.compiler.release>${java.version}</maven.compiler.release>
  
          <karaf.version>4.4.8</karaf.version>
-         <elasticsearch.version>9.1.3</elasticsearch.version>
-         <elasticsearch.test.version>9.1.3</elasticsearch.test.version>
+         <elasticsearch.version>9.1.4</elasticsearch.version>
+         <elasticsearch.test.version>9.2.1</elasticsearch.test.version>
 -        <opensearch.version>3.0.0</opensearch.version>
 -        <opensearch.rest.client.version>3.0.0</opensearch.rest.client.version>
 +        <opensearch.version>3.4.0</opensearch.version>
 +        <opensearch.rest.client.version>3.4.0</opensearch.rest.client.version>
          <httpclient5.version>5.2.1</httpclient5.version>
          <aws.sdk.version>2.28.14</aws.sdk.version>
          <javax-validation.version>1.1.0.Final</javax-validation.version>
diff --cc 
services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index 61b077375,9673a707c..cb0ad7544
--- 
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@@ -47,12 -47,128 +47,14 @@@ import java.util.stream.Collectors
  
  import static 
org.apache.unomi.persistence.spi.CustomObjectMapper.getObjectMapper;
  
 -public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleListener {
 -
 -    private static final String DECREMENT_NB_OF_VISITS_SCRIPT = 
"decNbOfVisits";
 -
 -    /**
 -     * This class is responsible for storing property types and permits 
optimized access to them.
 -     * In order to assure data consistency, thread-safety and performance, 
this class is immutable and every operation on
 -     * property types requires creating a new instance (copy-on-write).
 -     */
 -    private static class PropertyTypes {
 -        private final List<PropertyType> allPropertyTypes;
 -        private Map<String, PropertyType> propertyTypesById = new HashMap<>();
 -        private Map<String, List<PropertyType>> propertyTypesByTags = new 
HashMap<>();
 -        private Map<String, List<PropertyType>> propertyTypesBySystemTags = 
new HashMap<>();
 -        private Map<String, List<PropertyType>> propertyTypesByTarget = new 
HashMap<>();
 -
 -        public PropertyTypes(List<PropertyType> allPropertyTypes) {
 -            this.allPropertyTypes = new ArrayList<>(allPropertyTypes);
 -            propertyTypesById = new HashMap<>();
 -            propertyTypesByTags = new HashMap<>();
 -            propertyTypesBySystemTags = new HashMap<>();
 -            propertyTypesByTarget = new HashMap<>();
 -            for (PropertyType propertyType : allPropertyTypes) {
 -                propertyTypesById.put(propertyType.getItemId(), propertyType);
 -                for (String propertyTypeTag : 
propertyType.getMetadata().getTags()) {
 -                    updateListMap(propertyTypesByTags, propertyType, 
propertyTypeTag);
 -                }
 -                for (String propertyTypeSystemTag : 
propertyType.getMetadata().getSystemTags()) {
 -                    updateListMap(propertyTypesBySystemTags, propertyType, 
propertyTypeSystemTag);
 -                }
 -                updateListMap(propertyTypesByTarget, propertyType, 
propertyType.getTarget());
 -            }
 -        }
 -
 -        public List<PropertyType> getAll() {
 -            return allPropertyTypes;
 -        }
 -
 -        public PropertyType get(String propertyId) {
 -            return propertyTypesById.get(propertyId);
 -        }
 -
 -        public Map<String, List<PropertyType>> getAllByTarget() {
 -            return propertyTypesByTarget;
 -        }
 -
 -        public List<PropertyType> getByTag(String tag) {
 -            return propertyTypesByTags.get(tag);
 -        }
 -
 -        public List<PropertyType> getBySystemTag(String systemTag) {
 -            return propertyTypesBySystemTags.get(systemTag);
 -        }
 -
 -        public List<PropertyType> getByTarget(String target) {
 -            return propertyTypesByTarget.get(target);
 -        }
 -
 -        public PropertyTypes with(PropertyType newProperty) {
 -            return with(Collections.singletonList(newProperty));
 -        }
 -
 -        /**
 -         * Creates a new instance of this class containing given property 
types.
 -         * If property types with the same ID existed before, they will be 
replaced by the new ones.
 -         *
 -         * @param newProperties list of property types to change
 -         * @return new instance
 -         */
 -        public PropertyTypes with(List<PropertyType> newProperties) {
 -            Map<String, PropertyType> updatedProperties = new HashMap<>();
 -            for (PropertyType property : newProperties) {
 -                if (propertyTypesById.containsKey(property.getItemId())) {
 -                    updatedProperties.put(property.getItemId(), property);
 -                }
 -            }
 -
 -            List<PropertyType> newPropertyTypes = Stream.concat(
 -                    allPropertyTypes.stream().map(property -> 
updatedProperties.getOrDefault(property.getItemId(), property)),
 -                    newProperties.stream().filter(property -> 
!propertyTypesById.containsKey(property.getItemId()))
 -            ).collect(Collectors.toList());
 -
 -            return new PropertyTypes(newPropertyTypes);
 -        }
 -
 -        /**
 -         * Creates a new instance of this class containing all property types 
except the one with given ID.
 -         *
 -         * @param propertyId ID of the property to delete
 -         * @return new instance
 -         */
 -        public PropertyTypes without(String propertyId) {
 -            List<PropertyType> newPropertyTypes = allPropertyTypes.stream()
 -                    .filter(property -> 
!property.getItemId().equals(propertyId))
 -                    .collect(Collectors.toList());
 -
 -            return new PropertyTypes(newPropertyTypes);
 -        }
 -
 -        private void updateListMap(Map<String, List<PropertyType>> listMap, 
PropertyType propertyType, String key) {
 -            List<PropertyType> propertyTypes = listMap.get(key);
 -            if (propertyTypes == null) {
 -                propertyTypes = new ArrayList<>();
 -            }
 -            propertyTypes.add(propertyType);
 -            listMap.put(key, propertyTypes);
 -        }
 -
 -    }
 +public class ProfileServiceImpl extends AbstractMultiTypeCachingService 
implements ProfileService {
  
      private static final Logger LOGGER = 
LoggerFactory.getLogger(ProfileServiceImpl.class.getName());
 -    private static final int NB_OF_VISITS_DECREMENT_BATCH_SIZE = 500;
  
 -    private BundleContext bundleContext;
 -
 -    private PersistenceService persistenceService;
++    private static final String DECREMENT_NB_OF_VISITS_SCRIPT = 
"decNbOfVisits";
+ 
      private DefinitionsService definitionsService;
  
 -    private SchedulerService schedulerService;
 -
      private SegmentService segmentService;
  
      private Integer purgeProfileExistTime = 0;
@@@ -235,62 -489,48 +334,85 @@@
      }
  
      private void initializePurge() {
+         LOGGER.info("Purge: Initializing");
+ 
 +        if (purgeProfileExistTime <= 0 && purgeProfileInactiveTime <= 0 && 
purgeSessionExistTime <= 0 && purgeEventExistTime <= 0) {
 +            return;
 +        }
 +
+         if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || 
purgeSessionExistTime > 0 || purgeEventExistTime > 0) {
+             if (purgeProfileInactiveTime > 0) {
+                 LOGGER.info("Purge: Profile with no visits since more than {} 
days, will be purged", purgeProfileInactiveTime);
+             }
+             if (purgeProfileExistTime > 0) {
+                 LOGGER.info("Purge: Profile created since more than {} days, 
will be purged", purgeProfileExistTime);
+             }
+ 
+             if (purgeSessionExistTime > 0) {
+                 LOGGER.info("Purge: Session items created since more than {} 
days, will be purged", purgeSessionExistTime);
+             }
+             if (purgeEventExistTime > 0) {
+                 LOGGER.info("Purge: Event items created since more than {} 
days, will be purged", purgeEventExistTime);
+             }
+ 
 -            purgeTask = new TimerTask() {
 -                @Override
 -                public void run() {
 +        // Register the task executor for profile purge
 +        TaskExecutor profilePurgeExecutor = new TaskExecutor() {
 +            @Override
 +            public String getTaskType() {
 +                return "profile-purge";
 +            }
 +
 +            @Override
 +            public void execute(ScheduledTask task, 
TaskExecutor.TaskStatusCallback callback) {
 +                contextManager.executeAsSystem(() -> {
                      try {
+                         long purgeStartTime = System.currentTimeMillis();
+                         LOGGER.info("Purge: triggered");
+ 
+                         // Profile purge
                          purgeProfiles(purgeProfileInactiveTime, 
purgeProfileExistTime);
 -
 -                        // Monthly items purge
 -                        purgeSessionItems(purgeSessionExistTime);
 -                        purgeEventItems(purgeEventExistTime);
 +                        if (purgeSessionExistTime > 0) {
 +                            purgeSessionItems(purgeSessionExistTime);
 +                        }
 +                        if (purgeEventExistTime > 0) {
 +                            purgeEventItems(purgeEventExistTime);
 +                        }
+                         LOGGER.info("Purge: executed in {} ms", 
System.currentTimeMillis() - purgeStartTime);
++                        
 +                        callback.complete();
                      } catch (Throwable t) {
 -                        LOGGER.error("Error while purging", t);
 +                        // During shutdown, services may be unavailable - 
only log if not shutting down
 +                        LOGGER.error("Error while purging profiles, sessions, 
or events", t);
 +                        callback.fail(t.getMessage());
                      }
 -                }
 -            };
 -
 -            
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(purgeTask, 1, 
purgeProfileInterval, TimeUnit.DAYS);
 +                    return null;
 +                });
 +            }
 +        };
  
 -            LOGGER.info("Purge: purge scheduled with an interval of {} days", 
purgeProfileInterval);
 +        schedulerService.registerTaskExecutor(profilePurgeExecutor);
 +
 +        // Check if a purge task already exists
 +        List<ScheduledTask> existingTasks = 
schedulerService.getTasksByType("profile-purge", 0, 1, null).getList();
 +        if (!existingTasks.isEmpty() && existingTasks.get(0).isSystemTask()) {
 +            // Reuse the existing task if it's a system task
 +            purgeTask = existingTasks.get(0);
 +            // Update task configuration if needed
 +            purgeTask.setPeriod(purgeProfileInterval);
 +            purgeTask.setTimeUnit(TimeUnit.DAYS);
 +            purgeTask.setFixedRate(true);
 +            purgeTask.setEnabled(true);
 +            schedulerService.saveTask(purgeTask);
 +            LOGGER.info("Reusing existing system purge task: {}", 
purgeTask.getItemId());
          } else {
 -            LOGGER.info("Purge: No purge scheduled");
 +            // Create a new task if none exists or existing one isn't a 
system task
 +            purgeTask = schedulerService.newTask("profile-purge")
 +                .withPeriod(purgeProfileInterval, TimeUnit.DAYS)
 +                .withFixedRate()  // Run at fixed intervals
 +                // By default tasks run on a single node, no need to 
explicitly set it
 +                .asSystemTask() // Mark as a system task
 +                .schedule();
 +            LOGGER.info("Created new system purge task: {}", 
purgeTask.getItemId());
          }
      }
  

Reply via email to