jkevan commented on a change in pull request #260:
URL: https://github.com/apache/unomi/pull/260#discussion_r591637316
##########
File path:
services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
##########
@@ -839,20 +839,25 @@ private void
updateExistingProfilesForPastEventCondition(Condition eventConditio
String propertyKey = (String)
parentCondition.getParameter("generatedPropertyKey");
+ int updatedProfileCount = 0;
if(pastEventsDisablePartitions) {
Map<String, Long> eventCountByProfile =
persistenceService.aggregateWithOptimizedQuery(eventCondition, new
TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
- updateProfilesWithPastEventProperty(eventCountByProfile,
propertyKey);
+ updatedProfileCount =
updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
} else {
Map<String, Double> m =
persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"},
"profileId.keyword", Event.ITEM_TYPE);
long card = m.get("_card").longValue();
int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile =
persistenceService.aggregateWithOptimizedQuery(andCondition, new
TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
- updateProfilesWithPastEventProperty(eventCountByProfile,
propertyKey);
+ updatedProfileCount +=
updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
}
}
- logger.info("Profiles past condition updated in {}ms",
System.currentTimeMillis() - t);
+ if (forceRefresh && updatedProfileCount > 0) {
Review comment:
That the point of the PR, I'm not using the bulk processor anymore.
But to answer: you cannot be sure, even if you do a bulkProcessor.flush()
The bulkProcessor is doing the update asynchronously in a separate thread.
I did debug this part with logs and the bulkProcessor listener, I was able
to see that the refresh always come before bulkProcessor perform the request.
For the current PR, I'm now using this function to do the update (that do
not use the bulk processor):
https://github.com/apache/unomi/blob/master/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java#L967-L997
Especially because I cannot handle the bulk processor asynchronous behavior,
and I need the past event props to be saved before the segment save try to
update the profiles.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]