This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c826524a0 [Fix_#3413] Speed up businesskey query performance (#3441)
1c826524a0 is described below

commit 1c826524a029d0de5db8ccca4cbf1875edd49a9b
Author: Francisco Javier Tirado Sarti 
<[email protected]>
AuthorDate: Sat Mar 16 12:35:56 2024 +0100

    [Fix_#3413] Speed up businesskey query performance (#3441)
    
    * [Fix #3413] Speed up business key query performance
    
    * [Fix #3413] Walters comments
    
    * [Fix #3413] Fixing flaky test
---
 .../kogito/persistence/jdbc/GenericRepository.java | 36 +++++++++++++++++-----
 .../persistence/jdbc/JDBCProcessInstances.java     |  8 ++++-
 .../kie/kogito/persistence/jdbc/Repository.java    |  6 +++-
 .../db/ansi/V10.0.0__add_business_key_ansi.sql     | 11 +++++++
 .../V10.0.0__add_business_key_PostgreSQL.sql       | 12 ++++++++
 .../jdbc/AbstractProcessInstancesIT.java           |  5 +--
 .../org/kie/kogito/process/ProcessInstances.java   |  6 +++-
 7 files changed, 71 insertions(+), 13 deletions(-)

diff --git 
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/GenericRepository.java
 
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/GenericRepository.java
index 58fc611b17..9dd066ee9f 100644
--- 
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/GenericRepository.java
+++ 
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/GenericRepository.java
@@ -34,16 +34,11 @@ import java.util.stream.StreamSupport;
 
 import javax.sql.DataSource;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class GenericRepository extends Repository {
 
     private static final String PAYLOAD = "payload";
     private static final String VERSION = "version";
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GenericRepository.class);
-
     private final DataSource dataSource;
 
     public GenericRepository(DataSource dataSource) {
@@ -51,17 +46,25 @@ public class GenericRepository extends Repository {
     }
 
     @Override
-    void insertInternal(String processId, String processVersion, UUID id, 
byte[] payload) {
+    void insertInternal(String processId, String processVersion, UUID id, 
byte[] payload, String businessKey) {
         try (Connection connection = dataSource.getConnection();
                 PreparedStatement statement = 
connection.prepareStatement(INSERT)) {
-            statement.setString(1, id.toString());
+            String processInstanceId = id.toString();
+            statement.setString(1, processInstanceId);
             statement.setBytes(2, payload);
             statement.setString(3, processId);
             statement.setString(4, processVersion);
             statement.setLong(5, 0L);
             statement.executeUpdate();
+            if (businessKey != null) {
+                try (PreparedStatement businessKeyStmt = 
connection.prepareStatement(INSERT_BUSINESS_KEY)) {
+                    businessKeyStmt.setString(1, businessKey);
+                    businessKeyStmt.setString(2, processInstanceId);
+                    businessKeyStmt.executeUpdate();
+                }
+            }
         } catch (Exception e) {
-            throw uncheckedException(e, "Error inserting process instance %s", 
id);
+            throw uncheckedException(e, "Error inserting process instance id: 
%s, processId: %s processVersion: %s business key: %s", id, processId, 
processVersion, businessKey);
         }
     }
 
@@ -140,6 +143,23 @@ public class GenericRepository extends Repository {
         return Optional.empty();
     }
 
+    @Override
+    Optional<Record> findByBusinessKey(String processId, String 
processVersion, String businessKey) {
+        try (Connection connection = dataSource.getConnection();
+                PreparedStatement statement = 
connection.prepareStatement(sqlIncludingVersion(FIND_BY_BUSINESS_KEY, 
processVersion))) {
+            statement.setString(1, businessKey);
+            statement.setString(2, processId);
+            if (processVersion != null) {
+                statement.setString(3, processVersion);
+            }
+            try (ResultSet resultSet = statement.executeQuery()) {
+                return resultSet.next() ? Optional.of(from(resultSet)) : 
Optional.empty();
+            }
+        } catch (Exception e) {
+            throw uncheckedException(e, "Error finding process instance. 
Business key: %s, Process Id: %s, Process version: %s", businessKey, processId, 
processVersion);
+        }
+    }
+
     private static class CloseableWrapper implements Runnable {
 
         private Deque<AutoCloseable> wrapped = new ArrayDeque<>();
diff --git 
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java
 
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java
index 789da4f5c7..26c089fecf 100644
--- 
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java
+++ 
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java
@@ -60,7 +60,7 @@ public class JDBCProcessInstances implements 
MutableProcessInstances {
     public void create(String id, ProcessInstance instance) {
         LOGGER.debug("Creating process instance id: {}, processId: {}, 
processVersion: {}", id, process.id(), process.version());
         if (isActive(instance)) {
-            repository.insertInternal(process.id(), process.version(), 
UUID.fromString(id), marshaller.marshallProcessInstance(instance));
+            repository.insertInternal(process.id(), process.version(), 
UUID.fromString(id), marshaller.marshallProcessInstance(instance), 
instance.businessKey());
         } else {
             LOGGER.warn("Skipping create of process instance id: {}, state: 
{}", id, instance.status());
         }
@@ -101,6 +101,12 @@ public class JDBCProcessInstances implements 
MutableProcessInstances {
         return repository.findByIdInternal(process.id(), process.version(), 
UUID.fromString(id)).map(r -> unmarshall(r, mode));
     }
 
+    @Override
+    public Optional<ProcessInstance<?>> findByBusinessKey(String businessKey, 
ProcessInstanceReadMode mode) {
+        LOGGER.debug("Find process instance using business Key : {}", 
businessKey);
+        return repository.findByBusinessKey(process.id(), process.version(), 
businessKey).map(r -> unmarshall(r, mode));
+    }
+
     @Override
     public Stream<ProcessInstance<?>> stream(ProcessInstanceReadMode mode) {
         LOGGER.debug("Find process instance values using mode: {}", mode);
diff --git 
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/Repository.java
 
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/Repository.java
index 69ac9bfe69..a1d1036766 100644
--- 
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/Repository.java
+++ 
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/Repository.java
@@ -25,8 +25,10 @@ import java.util.stream.Stream;
 abstract class Repository {
 
     static final String INSERT = "INSERT INTO process_instances (id, payload, 
process_id, process_version, version) VALUES (?, ?, ?, ?, ?)";
+    static final String INSERT_BUSINESS_KEY = "INSERT INTO 
business_key_mapping (business_key,process_instance_id) VALUES (?,?)";
     static final String FIND_ALL = "SELECT payload, version FROM 
process_instances WHERE process_id = ?";
     static final String FIND_BY_ID = "SELECT payload, version FROM 
process_instances WHERE process_id = ? and id = ?";
+    static final String FIND_BY_BUSINESS_KEY = "SELECT payload, version FROM 
process_instances INNER JOIN business_key_mapping ON id = process_instance_id 
WHERE business_key = ? and process_id = ?";
     static final String UPDATE = "UPDATE process_instances SET payload = ? 
WHERE process_id = ? and id = ?";
     static final String UPDATE_WITH_LOCK = "UPDATE process_instances SET 
payload = ?, version = ? WHERE process_id = ? and id = ? and version = ?";
     static final String DELETE = "DELETE FROM process_instances WHERE 
process_id = ? and id = ?";
@@ -51,7 +53,7 @@ abstract class Repository {
         }
     }
 
-    abstract void insertInternal(String processId, String processVersion, UUID 
id, byte[] payload);
+    abstract void insertInternal(String processId, String processVersion, UUID 
id, byte[] payload, String businessKey);
 
     abstract void updateInternal(String processId, String processVersion, UUID 
id, byte[] payload);
 
@@ -61,6 +63,8 @@ abstract class Repository {
 
     abstract Optional<Record> findByIdInternal(String processId, String 
processVersion, UUID id);
 
+    abstract Optional<Record> findByBusinessKey(String processId, String 
processVersion, String businessKey);
+
     abstract Stream<Record> findAllInternal(String processId, String 
processVersion);
 
     protected RuntimeException uncheckedException(Exception ex, String 
message, Object... param) {
diff --git 
a/addons/common/persistence/jdbc/src/main/resources/db/ansi/V10.0.0__add_business_key_ansi.sql
 
b/addons/common/persistence/jdbc/src/main/resources/db/ansi/V10.0.0__add_business_key_ansi.sql
new file mode 100644
index 0000000000..dad119c2ad
--- /dev/null
+++ 
b/addons/common/persistence/jdbc/src/main/resources/db/ansi/V10.0.0__add_business_key_ansi.sql
@@ -0,0 +1,11 @@
+CREATE TABLE business_key_mapping (
+     business_key VARCHAR (255) NOT NULL,
+     process_instance_id VARCHAR (36) NOT NULL,
+     CONSTRAINT business_key_primary_key PRIMARY KEY (business_key),
+     CONSTRAINT fk_process_instances 
+     FOREIGN KEY (process_instance_id)
+     REFERENCES process_instances(id)
+     ON DELETE CASCADE
+);
+
+CREATE INDEX idx_business_key_process_instance_id ON business_key_mapping 
(process_instance_id);
diff --git 
a/addons/common/persistence/jdbc/src/main/resources/db/postgresql/V10.0.0__add_business_key_PostgreSQL.sql
 
b/addons/common/persistence/jdbc/src/main/resources/db/postgresql/V10.0.0__add_business_key_PostgreSQL.sql
new file mode 100644
index 0000000000..4d04dad047
--- /dev/null
+++ 
b/addons/common/persistence/jdbc/src/main/resources/db/postgresql/V10.0.0__add_business_key_PostgreSQL.sql
@@ -0,0 +1,12 @@
+CREATE TABLE business_key_mapping (
+     business_key character (255) NOT NULL,
+     process_instance_id character (36) NOT NULL,
+     PRIMARY KEY (business_key),
+     CONSTRAINT fk_process_instances 
+     FOREIGN KEY (process_instance_id)
+     REFERENCES process_instances(id)
+     ON DELETE CASCADE
+);
+
+
+CREATE INDEX idx_business_key_process_instance_id ON business_key_mapping 
(process_instance_id);
diff --git 
a/addons/common/persistence/jdbc/src/test/java/org/kie/persistence/jdbc/AbstractProcessInstancesIT.java
 
b/addons/common/persistence/jdbc/src/test/java/org/kie/persistence/jdbc/AbstractProcessInstancesIT.java
index 985bd61348..3769ce2329 100644
--- 
a/addons/common/persistence/jdbc/src/test/java/org/kie/persistence/jdbc/AbstractProcessInstancesIT.java
+++ 
b/addons/common/persistence/jdbc/src/test/java/org/kie/persistence/jdbc/AbstractProcessInstancesIT.java
@@ -152,13 +152,14 @@ abstract class AbstractProcessInstancesIT {
     @Test
     void testBasicFlow() {
         var factory = new TestProcessInstancesFactory(getDataSource(), lock());
+        final String businessKey = "manolo";
         BpmnProcess process = createProcess(factory, "BPMN2-UserTask.bpmn2");
-        ProcessInstance<BpmnVariables> processInstance = 
process.createInstance(BpmnVariables.create(singletonMap("test",
+        ProcessInstance<BpmnVariables> processInstance = 
process.createInstance(businessKey, BpmnVariables.create(singletonMap("test",
                 "test")));
         processInstance.start();
 
         JDBCProcessInstances processInstances = (JDBCProcessInstances) 
process.instances();
-        Optional<?> foundOne = processInstances.findById(processInstance.id());
+        Optional<?> foundOne = processInstances.findByBusinessKey(businessKey);
         BpmnProcessInstance instanceOne = (BpmnProcessInstance) foundOne.get();
         processInstances.update(processInstance.id(), instanceOne);
 
diff --git 
a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java 
b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
index 6cc0d95649..a0ce2e8c42 100644
--- a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
+++ b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
@@ -30,7 +30,11 @@ public interface ProcessInstances<T> {
     Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode 
mode);
 
     default Optional<ProcessInstance<T>> findByBusinessKey(String id) {
-        return stream().filter(pi -> id.equals(pi.businessKey())).findAny();
+        return findByBusinessKey(id, ProcessInstanceReadMode.READ_ONLY);
+    }
+
+    default Optional<ProcessInstance<T>> findByBusinessKey(String id, 
ProcessInstanceReadMode mode) {
+        return stream(mode).filter(pi -> 
id.equals(pi.businessKey())).findAny();
     }
 
     Stream<ProcessInstance<T>> stream(ProcessInstanceReadMode mode);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to