This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new b73321905 [Fix #2113] Data index group processing (#2114)
b73321905 is described below

commit b733219052f6955ce191fb0c8e5efefd6a3ca485
Author: Francisco Javier Tirado Sarti 
<[email protected]>
AuthorDate: Wed Oct 16 16:34:01 2024 +0200

    [Fix #2113] Data index group processing (#2114)
    
    * [Fix #2113] Data index group processing
    
    * [Fix #2113] Support group containing different process instance ids
    
    * [Fix #2113] Flush call is probably not needed
    
    * [Fix #2113] Optimization for user tasks
    
    * Update 
data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java
    
    Co-authored-by: Gonzalo Muñoz <[email protected]>
    
    ---------
    
    Co-authored-by: Gonzalo Muñoz <[email protected]>
---
 .../kie/kogito/index/service/IndexingService.java  |  23 +----
 .../index/storage/ProcessInstanceStorage.java      |   3 +
 .../index/storage/UserTaskInstanceStorage.java     |   3 +
 .../index/storage/ModelProcessInstanceStorage.java |  18 ++++
 .../storage/ModelUserTaskInstanceStorage.java      |  21 +++-
 .../jpa/storage/ProcessInstanceEntityStorage.java  |  75 ++++++++------
 .../jpa/storage/UserTaskInstanceEntityStorage.java | 109 +++++++++++++++------
 7 files changed, 174 insertions(+), 78 deletions(-)

diff --git 
a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java
 
b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java
index f3cd153d5..5fbf4d653 100644
--- 
a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java
+++ 
b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java
@@ -79,15 +79,8 @@ public class IndexingService {
     public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
         ProcessInstanceStorage storage = manager.getProcessInstanceStorage();
         if (event instanceof MultipleProcessInstanceDataEvent) {
-            for (ProcessInstanceDataEvent<?> item : 
((MultipleProcessInstanceDataEvent) event).getData())
-                indexProccessInstanceEvent(storage, item);
-        } else {
-            indexProccessInstanceEvent(storage, event);
-        }
-    }
-
-    private void indexProccessInstanceEvent(ProcessInstanceStorage storage, 
ProcessInstanceDataEvent<?> event) {
-        if (event instanceof ProcessInstanceErrorDataEvent) {
+            storage.indexGroup(((MultipleProcessInstanceDataEvent) event));
+        } else if (event instanceof ProcessInstanceErrorDataEvent) {
             storage.indexError((ProcessInstanceErrorDataEvent) event);
         } else if (event instanceof ProcessInstanceNodeDataEvent) {
             storage.indexNode((ProcessInstanceNodeDataEvent) event);
@@ -112,16 +105,8 @@ public class IndexingService {
     public <T> void indexUserTaskInstanceEvent(UserTaskInstanceDataEvent<T> 
event) {
         UserTaskInstanceStorage storage = manager.getUserTaskInstanceStorage();
         if (event instanceof MultipleUserTaskInstanceDataEvent) {
-            for (UserTaskInstanceDataEvent<?> item : 
((MultipleUserTaskInstanceDataEvent) event).getData()) {
-                indexUserTaskInstanceEvent(storage, item);
-            }
-        } else {
-            indexUserTaskInstanceEvent(storage, event);
-        }
-    }
-
-    private void indexUserTaskInstanceEvent(UserTaskInstanceStorage storage, 
UserTaskInstanceDataEvent<?> event) {
-        if (event instanceof UserTaskInstanceAssignmentDataEvent) {
+            storage.indexGroup((MultipleUserTaskInstanceDataEvent) event);
+        } else if (event instanceof UserTaskInstanceAssignmentDataEvent) {
             storage.indexAssignment((UserTaskInstanceAssignmentDataEvent) 
event);
         } else if (event instanceof UserTaskInstanceAttachmentDataEvent) {
             storage.indexAttachment((UserTaskInstanceAttachmentDataEvent) 
event);
diff --git 
a/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java
 
b/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java
index 753caf66b..674cc17bd 100644
--- 
a/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java
+++ 
b/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java
@@ -18,6 +18,7 @@
  */
 package org.kie.kogito.index.storage;
 
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
 import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
 import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
 import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
@@ -28,6 +29,8 @@ import org.kie.kogito.persistence.api.StorageFetcher;
 
 public interface ProcessInstanceStorage extends StorageFetcher<String, 
ProcessInstance> {
 
+    void indexGroup(MultipleProcessInstanceDataEvent event);
+
     void indexError(ProcessInstanceErrorDataEvent event);
 
     void indexNode(ProcessInstanceNodeDataEvent event);
diff --git 
a/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/UserTaskInstanceStorage.java
 
b/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/UserTaskInstanceStorage.java
index f315ae4f7..58c586fd6 100644
--- 
a/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/UserTaskInstanceStorage.java
+++ 
b/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/UserTaskInstanceStorage.java
@@ -18,6 +18,7 @@
  */
 package org.kie.kogito.index.storage;
 
+import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
@@ -40,4 +41,6 @@ public interface UserTaskInstanceStorage extends 
StorageFetcher<String, UserTask
     void indexComment(UserTaskInstanceCommentDataEvent event);
 
     void indexVariable(UserTaskInstanceVariableDataEvent event);
+
+    void indexGroup(MultipleUserTaskInstanceDataEvent event);
 }
diff --git 
a/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java
 
b/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java
index 9f21497fc..07b645bb7 100644
--- 
a/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java
+++ 
b/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java
@@ -18,6 +18,7 @@
  */
 package org.kie.kogito.index.storage;
 
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
 import org.kie.kogito.event.process.ProcessInstanceDataEvent;
 import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
 import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
@@ -69,6 +70,23 @@ public class ModelProcessInstanceStorage extends 
ModelStorageFetcher<String, Pro
         index(event, variableMerger);
     }
 
+    @Override
+    public void indexGroup(MultipleProcessInstanceDataEvent events) {
+        for (ProcessInstanceDataEvent<?> event : events.getData()) {
+            if (event instanceof ProcessInstanceErrorDataEvent) {
+                index(event, errorMerger);
+            } else if (event instanceof ProcessInstanceNodeDataEvent) {
+                index(event, nodeMerger);
+            } else if (event instanceof ProcessInstanceSLADataEvent) {
+                index(event, slaMerger);
+            } else if (event instanceof ProcessInstanceStateDataEvent) {
+                index(event, stateMerger);
+            } else if (event instanceof ProcessInstanceVariableDataEvent) {
+                index(event, variableMerger);
+            }
+        }
+    }
+
     private <T extends ProcessInstanceDataEvent<?>> void index(T event, 
ProcessInstanceEventMerger merger) {
         ProcessInstance processInstance = 
storage.get(event.getKogitoProcessInstanceId());
         if (processInstance == null) {
diff --git 
a/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelUserTaskInstanceStorage.java
 
b/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelUserTaskInstanceStorage.java
index 20f98a396..e76104c9e 100644
--- 
a/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelUserTaskInstanceStorage.java
+++ 
b/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelUserTaskInstanceStorage.java
@@ -20,6 +20,7 @@ package org.kie.kogito.index.storage;
 
 import java.util.ArrayList;
 
+import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
@@ -53,7 +54,6 @@ public class ModelUserTaskInstanceStorage extends 
ModelStorageFetcher<String, Us
     @Override
     public void indexAssignment(UserTaskInstanceAssignmentDataEvent event) {
         index(event, assignmentMerger);
-
     }
 
     @Override
@@ -76,13 +76,30 @@ public class ModelUserTaskInstanceStorage extends 
ModelStorageFetcher<String, Us
     @Override
     public void indexVariable(UserTaskInstanceVariableDataEvent event) {
         index(event, variableMerger);
-
     }
 
     @Override
     public void indexComment(UserTaskInstanceCommentDataEvent event) {
         index(event, commentMerger);
+    }
 
+    @Override
+    public void indexGroup(MultipleUserTaskInstanceDataEvent events) {
+        for (UserTaskInstanceDataEvent<?> event : events.getData()) {
+            if (event instanceof UserTaskInstanceAssignmentDataEvent) {
+                index((UserTaskInstanceAssignmentDataEvent) event, 
assignmentMerger);
+            } else if (event instanceof UserTaskInstanceAttachmentDataEvent) {
+                index((UserTaskInstanceAttachmentDataEvent) event, 
attachmentMerger);
+            } else if (event instanceof UserTaskInstanceDeadlineDataEvent) {
+                index((UserTaskInstanceDeadlineDataEvent) event, 
deadlineMerger);
+            } else if (event instanceof UserTaskInstanceStateDataEvent) {
+                index((UserTaskInstanceStateDataEvent) event, stateMerger);
+            } else if (event instanceof UserTaskInstanceCommentDataEvent) {
+                index((UserTaskInstanceCommentDataEvent) event, commentMerger);
+            } else if (event instanceof UserTaskInstanceVariableDataEvent) {
+                index((UserTaskInstanceVariableDataEvent) event, 
variableMerger);
+            }
+        }
     }
 
     private <T extends UserTaskInstanceDataEvent<?>> void index(T event, 
UserTaskInstanceEventMerger merger) {
diff --git 
a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java
 
b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java
index 386aabcf7..b991c1a83 100644
--- 
a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java
+++ 
b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java
@@ -20,9 +20,12 @@ package org.kie.kogito.index.jpa.storage;
 
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
-import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
 import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
 import org.kie.kogito.event.process.ProcessInstanceErrorEventBody;
 import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
@@ -64,43 +67,51 @@ public class ProcessInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Stri
         super(repository, ProcessInstanceEntity.class, mapper::mapToModel);
     }
 
+    @Override
+    @Transactional
+    public void indexGroup(MultipleProcessInstanceDataEvent events) {
+        Map<String, ProcessInstanceEntity> piMap = new HashMap<>();
+        for (ProcessInstanceDataEvent<?> event : events.getData()) {
+            
indexEvent(piMap.computeIfAbsent(event.getKogitoProcessInstanceId(), id -> 
findOrInit(event)), event);
+        }
+    }
+
     @Override
     @Transactional
     public void indexError(ProcessInstanceErrorDataEvent event) {
-        indexError(event.getData());
+        indexError(findOrInit(event), event.getData());
     }
 
     @Override
     @Transactional
     public void indexNode(ProcessInstanceNodeDataEvent event) {
-        indexNode(event.getData());
+        indexNode(findOrInit(event), event.getData());
     }
 
     @Override
     @Transactional
     public void indexSLA(ProcessInstanceSLADataEvent event) {
-        indexSLA(event.getData());
-
+        indexSla(findOrInit(event), event.getData());
     }
 
     @Override
     @Transactional
     public void indexState(ProcessInstanceStateDataEvent event) {
-        indexState(event.getData(), event.getKogitoAddons() == null ? Set.of() 
: Set.of(event.getKogitoAddons().split(",")), event.getSource() == null ? null 
: event.getSource().toString());
+        indexState(findOrInit(event), event);
     }
 
     @Override
     @Transactional
     public void indexVariable(ProcessInstanceVariableDataEvent event) {
-        indexVariable(event.getData());
+        indexVariable(findOrInit(event), event.getData());
     }
 
-    private ProcessInstanceEntity findOrInit(String processId, String 
processInstanceId, Date date) {
-        return repository.findByIdOptional(processInstanceId).orElseGet(() -> {
+    private ProcessInstanceEntity findOrInit(ProcessInstanceDataEvent<?> 
event) {
+        return 
repository.findByIdOptional(event.getKogitoProcessInstanceId()).orElseGet(() -> 
{
             ProcessInstanceEntity pi = new ProcessInstanceEntity();
-            pi.setProcessId(processId);
-            pi.setId(processInstanceId);
-            pi.setLastUpdate(toZonedDateTime(date));
+            pi.setProcessId(event.getKogitoProcessId());
+            pi.setId(event.getKogitoProcessInstanceId());
+            pi.setLastUpdate(toZonedDateTime(event.getTime()));
             pi.setNodes(new ArrayList<>());
             pi.setMilestones(new ArrayList<>());
             repository.persist(pi);
@@ -108,8 +119,21 @@ public class ProcessInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Stri
         });
     }
 
-    private void indexError(ProcessInstanceErrorEventBody error) {
-        ProcessInstanceEntity pi = findOrInit(error.getProcessId(), 
error.getProcessInstanceId(), error.getEventDate());
+    private void indexEvent(ProcessInstanceEntity pi, 
ProcessInstanceDataEvent<?> event) {
+        if (event instanceof ProcessInstanceErrorDataEvent) {
+            indexError(pi, ((ProcessInstanceErrorDataEvent) event).getData());
+        } else if (event instanceof ProcessInstanceNodeDataEvent) {
+            indexNode(pi, ((ProcessInstanceNodeDataEvent) event).getData());
+        } else if (event instanceof ProcessInstanceSLADataEvent) {
+            indexSla(pi, ((ProcessInstanceSLADataEvent) event).getData());
+        } else if (event instanceof ProcessInstanceStateDataEvent) {
+            indexState(pi, (ProcessInstanceStateDataEvent) event);
+        } else if (event instanceof ProcessInstanceVariableDataEvent) {
+            indexVariable(pi, ((ProcessInstanceVariableDataEvent) 
event).getData());
+        }
+    }
+
+    private void indexError(ProcessInstanceEntity pi, 
ProcessInstanceErrorEventBody error) {
         ProcessInstanceErrorEntity errorEntity = pi.getError();
         if (errorEntity == null) {
             errorEntity = new ProcessInstanceErrorEntity();
@@ -118,16 +142,13 @@ public class ProcessInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Stri
         errorEntity.setMessage(error.getErrorMessage());
         errorEntity.setNodeDefinitionId(error.getNodeDefinitionId());
         pi.setState(CommonUtils.ERROR_STATE);
-        repository.flush();
     }
 
-    private void indexNode(ProcessInstanceNodeEventBody data) {
-        ProcessInstanceEntity pi = findOrInit(data.getProcessId(), 
data.getProcessInstanceId(), data.getEventDate());
+    private void indexNode(ProcessInstanceEntity pi, 
ProcessInstanceNodeEventBody data) {
         pi.getNodes().stream().filter(n -> 
n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> 
updateNode(n, data), () -> createNode(pi, data));
         if ("MilestoneNode".equals(data.getNodeType())) {
             pi.getMilestones().stream().filter(n -> 
n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> 
updateMilestone(n, data), () -> createMilestone(pi, data));
         }
-        repository.flush();
     }
 
     private MilestoneEntity createMilestone(ProcessInstanceEntity pi, 
ProcessInstanceNodeEventBody data) {
@@ -159,7 +180,6 @@ public class ProcessInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Stri
         nodeInstance.setType(body.getNodeType());
         ZonedDateTime eventDate = toZonedDateTime(body.getEventDate());
         switch (body.getEventType()) {
-
             case EVENT_TYPE_ENTER:
                 nodeInstance.setEnter(eventDate);
                 break;
@@ -174,13 +194,12 @@ public class ProcessInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Stri
         return nodeInstance;
     }
 
-    private void indexSLA(ProcessInstanceSLAEventBody data) {
-        findOrInit(data.getProcessId(), data.getProcessInstanceId(), 
data.getEventDate());
-        repository.flush();
+    private void indexState(ProcessInstanceEntity pi, 
ProcessInstanceStateDataEvent event) {
+        indexState(pi, event.getData(), (event.getKogitoAddons() == null || 
event.getKogitoAddons().isEmpty()) ? Set.of() : 
Set.of(event.getKogitoAddons().split(",")),
+                event.getSource() == null ? null : 
event.getSource().toString());
     }
 
-    private void indexState(ProcessInstanceStateEventBody data, Set<String> 
addons, String endpoint) {
-        ProcessInstanceEntity pi = findOrInit(data.getProcessId(), 
data.getProcessInstanceId(), data.getEventDate());
+    private void indexState(ProcessInstanceEntity pi, 
ProcessInstanceStateEventBody data, Set<String> addons, String endpoint) {
         pi.setVersion(data.getProcessVersion());
         pi.setProcessName(data.getProcessName());
         pi.setRootProcessInstanceId(data.getRootProcessInstanceId());
@@ -199,13 +218,13 @@ public class ProcessInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Stri
         pi.setLastUpdate(toZonedDateTime(data.getEventDate()));
         pi.setAddons(addons);
         pi.setEndpoint(endpoint);
-        repository.flush();
     }
 
-    private void indexVariable(ProcessInstanceVariableEventBody data) {
-        ProcessInstanceEntity pi = findOrInit(data.getProcessId(), 
data.getProcessInstanceId(), data.getEventDate());
+    private void indexVariable(ProcessInstanceEntity pi, 
ProcessInstanceVariableEventBody data) {
         pi.setVariables(JsonUtils.mergeVariable(data.getVariableName(), 
data.getVariableValue(), pi.getVariables()));
-        repository.flush();
     }
 
+    private void indexSla(ProcessInstanceEntity orInit, 
ProcessInstanceSLAEventBody data) {
+        // SLA does nothing for now
+    }
 }
diff --git 
a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/UserTaskInstanceEntityStorage.java
 
b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/UserTaskInstanceEntityStorage.java
index 1e2c127a3..cb75b1e31 100644
--- 
a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/UserTaskInstanceEntityStorage.java
+++ 
b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/UserTaskInstanceEntityStorage.java
@@ -19,15 +19,19 @@
 package org.kie.kogito.index.jpa.storage;
 
 import java.net.URI;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
+import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentEventBody;
 import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentEventBody;
 import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceCommentEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceStateEventBody;
@@ -66,20 +70,69 @@ public class UserTaskInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Str
         super(repository, UserTaskInstanceEntity.class, mapper::mapToModel);
     }
 
-    private UserTaskInstanceEntity findOrInit(String taskId) {
-        return repository.findByIdOptional(taskId).orElseGet(() -> {
-            UserTaskInstanceEntity ut = new UserTaskInstanceEntity();
-            ut.setId(taskId);
-            repository.persist(ut);
-            return ut;
-        });
+    @Override
+    @Transactional
+    public void indexGroup(MultipleUserTaskInstanceDataEvent events) {
+        Map<String, UserTaskInstanceEntity> taskMap = new HashMap<>();
+        for (UserTaskInstanceDataEvent<?> event : events.getData()) {
+            
indexEvent(taskMap.computeIfAbsent(event.getKogitoUserTaskInstanceId(), id -> 
findOrInit(id)), event);
+        }
     }
 
     @Override
     @Transactional
     public void indexAssignment(UserTaskInstanceAssignmentDataEvent event) {
+        indexAssignment(findOrInit(event), event);
+    }
+
+    @Override
+    @Transactional
+    public void indexAttachment(UserTaskInstanceAttachmentDataEvent event) {
+        indexAttachment(findOrInit(event), event);
+    }
+
+    @Override
+    @Transactional
+    public void indexDeadline(UserTaskInstanceDeadlineDataEvent event) {
+        indexDeadline(findOrInit(event), event);
+    }
+
+    @Override
+    @Transactional
+    public void indexState(UserTaskInstanceStateDataEvent event) {
+        indexState(findOrInit(event), event);
+    }
+
+    @Override
+    @Transactional
+    public void indexComment(UserTaskInstanceCommentDataEvent event) {
+        indexComment(findOrInit(event), event);
+    }
+
+    @Override
+    @Transactional
+    public void indexVariable(UserTaskInstanceVariableDataEvent event) {
+        indexVariable(findOrInit(event), event);
+    }
+
+    private void indexEvent(UserTaskInstanceEntity task, 
UserTaskInstanceDataEvent<?> event) {
+        if (event instanceof UserTaskInstanceAssignmentDataEvent) {
+            indexAssignment(task, (UserTaskInstanceAssignmentDataEvent) event);
+        } else if (event instanceof UserTaskInstanceAttachmentDataEvent) {
+            indexAttachment(task, (UserTaskInstanceAttachmentDataEvent) event);
+        } else if (event instanceof UserTaskInstanceDeadlineDataEvent) {
+            indexDeadline(task, (UserTaskInstanceDeadlineDataEvent) event);
+        } else if (event instanceof UserTaskInstanceStateDataEvent) {
+            indexState(task, (UserTaskInstanceStateDataEvent) event);
+        } else if (event instanceof UserTaskInstanceCommentDataEvent) {
+            indexComment(task, (UserTaskInstanceCommentDataEvent) event);
+        } else if (event instanceof UserTaskInstanceVariableDataEvent) {
+            indexVariable(task, (UserTaskInstanceVariableDataEvent) event);
+        }
+    }
+
+    private void indexAssignment(UserTaskInstanceEntity userTaskInstance, 
UserTaskInstanceAssignmentDataEvent event) {
         UserTaskInstanceAssignmentEventBody body = event.getData();
-        UserTaskInstanceEntity userTaskInstance = 
findOrInit(event.getKogitoUserTaskInstanceId());
         switch (body.getAssignmentType()) {
             case "USER_OWNERS":
                 userTaskInstance.setPotentialUsers(new 
HashSet<>(body.getUsers()));
@@ -97,13 +150,9 @@ public class UserTaskInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Str
                 userTaskInstance.setAdminUsers(new HashSet<>(body.getUsers()));
                 break;
         }
-        repository.flush();
     }
 
-    @Override
-    @Transactional
-    public void indexAttachment(UserTaskInstanceAttachmentDataEvent event) {
-        UserTaskInstanceEntity userTaskInstance = 
findOrInit(event.getKogitoUserTaskInstanceId());
+    private void indexAttachment(UserTaskInstanceEntity userTaskInstance, 
UserTaskInstanceAttachmentDataEvent event) {
         UserTaskInstanceAttachmentEventBody body = event.getData();
         List<AttachmentEntity> attachments = userTaskInstance.getAttachments();
         switch (body.getEventType()) {
@@ -127,17 +176,12 @@ public class UserTaskInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Str
         }
     }
 
-    @Override
-    @Transactional
-    public void indexDeadline(UserTaskInstanceDeadlineDataEvent event) {
-        findOrInit(event.getKogitoUserTaskInstanceId());
+    private void indexDeadline(UserTaskInstanceEntity userTaskInstance, 
UserTaskInstanceDeadlineDataEvent event) {
+        // deadlines ignored for now
     }
 
-    @Override
-    @Transactional
-    public void indexState(UserTaskInstanceStateDataEvent event) {
+    private void indexState(UserTaskInstanceEntity task, 
UserTaskInstanceStateDataEvent event) {
         UserTaskInstanceStateEventBody body = event.getData();
-        UserTaskInstanceEntity task = 
findOrInit(event.getKogitoUserTaskInstanceId());
         task.setProcessInstanceId(body.getProcessInstanceId());
         task.setProcessId(event.getKogitoProcessId());
         task.setRootProcessId(event.getKogitoRootProcessId());
@@ -163,11 +207,8 @@ public class UserTaskInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Str
         return source.toString() + format("/%s/%s/%s", pId, name, taskId);
     }
 
-    @Override
-    @Transactional
-    public void indexComment(UserTaskInstanceCommentDataEvent event) {
+    private void indexComment(UserTaskInstanceEntity userTaskInstance, 
UserTaskInstanceCommentDataEvent event) {
         UserTaskInstanceCommentEventBody body = event.getData();
-        UserTaskInstanceEntity userTaskInstance = 
findOrInit(event.getKogitoUserTaskInstanceId());
         List<CommentEntity> comments = userTaskInstance.getComments();
         switch (body.getEventType()) {
             case UserTaskInstanceCommentEventBody.EVENT_TYPE_ADDED:
@@ -190,10 +231,7 @@ public class UserTaskInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Str
         }
     }
 
-    @Override
-    @Transactional
-    public void indexVariable(UserTaskInstanceVariableDataEvent event) {
-        UserTaskInstanceEntity userTaskInstance = 
findOrInit(event.getKogitoUserTaskInstanceId());
+    private void indexVariable(UserTaskInstanceEntity userTaskInstance, 
UserTaskInstanceVariableDataEvent event) {
         UserTaskInstanceVariableEventBody body = event.getData();
         if (body.getVariableType().equals("INPUT")) {
             ObjectNode objectNode = userTaskInstance.getInputs();
@@ -211,4 +249,17 @@ public class UserTaskInstanceEntityStorage extends 
AbstractJPAStorageFetcher<Str
             userTaskInstance.setOutputs(objectNode);
         }
     }
+
+    private UserTaskInstanceEntity findOrInit(UserTaskInstanceDataEvent<?> 
event) {
+        return findOrInit(event.getKogitoUserTaskInstanceId());
+    }
+
+    private UserTaskInstanceEntity findOrInit(String taskId) {
+        return repository.findByIdOptional(taskId).orElseGet(() -> {
+            UserTaskInstanceEntity ut = new UserTaskInstanceEntity();
+            ut.setId(taskId);
+            repository.persist(ut);
+            return ut;
+        });
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to