tiagodolphine commented on code in PR #1906:
URL:
https://github.com/apache/incubator-kie-kogito-apps/pull/1906#discussion_r1390935004
##########
data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java:
##########
@@ -69,6 +71,34 @@ public class IndexingService {
Instance<UserTaskInstanceEventMerger> userTaskInstanceMergers;
public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
+ //retry in case of rare but possible race condition during the insert
for the first registry
+ ProcessInstance pi = executeWithRetry(event,
this::handleProcessInstanceEvent, "indexing process instance");
+
+ ProcessDefinition definition = pi.getDefinition();
+
+ if (definition != null &&
!manager.getProcessDefinitionsCache().containsKey(definition.getKey())) {
+ //retry in case of rare but possible race condition during the
insert for the first registry
+ executeWithRetry(definition, this::handleProcessDefinition,
"indexing process definition");
Review Comment:
@elguardian 👍 to the use of a queue strategy, this was brought in some
discussions, for example with kafka as a requirement we could rely on the
partition+key to guarantee the consumption as a queue per process instance id,
but with http+knative this is more difficult, we probably would need some
control in DB for example, anyway, the goal of this PR is just to be a quick
fix to unblock some use cases and to open a follow up PR after a design of a
more elegant solution for such issue, and to TBH this might occur on other
types of events not only the ones related to the process instance.
cc @wmedvede @fjtirado ˆ
##########
data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java:
##########
@@ -69,6 +71,34 @@ public class IndexingService {
Instance<UserTaskInstanceEventMerger> userTaskInstanceMergers;
public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
+ //retry in case of rare but possible race condition during the insert
for the first registry
+ ProcessInstance pi = executeWithRetry(event,
this::handleProcessInstanceEvent, "indexing process instance");
+
+ ProcessDefinition definition = pi.getDefinition();
+
+ if (definition != null &&
!manager.getProcessDefinitionsCache().containsKey(definition.getKey())) {
+ //retry in case of rare but possible race condition during the
insert for the first registry
+ executeWithRetry(definition, this::handleProcessDefinition,
"indexing process definition");
Review Comment:
@elguardian 👍 to the use of a queue strategy, this was brought in some
discussions, for example with kafka as a requirement we could rely on the
partition+key to guarantee the consumption as a queue per process instance id,
but with http+knative this is more difficult, we probably would need some
control in DB for example, anyway, the goal of this PR is just to be a quick
fix to unblock some use cases and to open a follow up PR after a design of a
more elegant solution for such issue, and to TBH this might occur on other
types of events not only the ones related to the process instance.
cc @wmedvede @fjtirado ˆ
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]