jkevan commented on a change in pull request #260:
URL: https://github.com/apache/unomi/pull/260#discussion_r591637316



##########
File path: 
services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
##########
@@ -839,20 +839,25 @@ private void 
updateExistingProfilesForPastEventCondition(Condition eventConditio
 
         String propertyKey = (String) 
parentCondition.getParameter("generatedPropertyKey");
 
+        int updatedProfileCount = 0;
         if(pastEventsDisablePartitions) {
             Map<String, Long> eventCountByProfile = 
persistenceService.aggregateWithOptimizedQuery(eventCondition, new 
TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
-            updateProfilesWithPastEventProperty(eventCountByProfile, 
propertyKey);
+            updatedProfileCount = 
updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
         } else {
             Map<String, Double> m = 
persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, 
"profileId.keyword", Event.ITEM_TYPE);
             long card = m.get("_card").longValue();
             int numParts = (int) (card / aggregateQueryBucketSize) + 2;
             for (int i = 0; i < numParts; i++) {
                 Map<String, Long> eventCountByProfile = 
persistenceService.aggregateWithOptimizedQuery(andCondition, new 
TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
-                updateProfilesWithPastEventProperty(eventCountByProfile, 
propertyKey);
+                updatedProfileCount += 
updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
             }
         }
 
-        logger.info("Profiles past condition updated in {}ms", 
System.currentTimeMillis() - t);
+        if (forceRefresh && updatedProfileCount > 0) {

Review comment:
       That the point of the PR, I'm not using the bulk processor anymore.
   But to answer: you cannot be sure, even if you do a bulkProcessor.flush()
   The bulkProcessor is doing the update asynchronously in a separate thread.
   I did debug this part with logs and the bulkProcessor listener, I was able 
to see that the refresh always come before bulkProcessor perform the request.
   
   For the current PR, I'm now using this function to do the update (that do 
not use the bulk processor): 
https://github.com/apache/unomi/blob/master/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java#L967-L997
   Especially because I cannot handle the bulk processor asynchronous behavior, 
and I need the past event props to be saved before the segment save try to 
update the profiles.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to