This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new 1c826524a0 [Fix_#3413] Speed up businesskey query performance (#3441)
1c826524a0 is described below
commit 1c826524a029d0de5db8ccca4cbf1875edd49a9b
Author: Francisco Javier Tirado Sarti
<[email protected]>
AuthorDate: Sat Mar 16 12:35:56 2024 +0100
[Fix_#3413] Speed up businesskey query performance (#3441)
* [Fix #3413] Speed up business key query performance
* [Fix #3413] Walters comments
* [Fix #3413] Fixing flaky test
---
.../kogito/persistence/jdbc/GenericRepository.java | 36 +++++++++++++++++-----
.../persistence/jdbc/JDBCProcessInstances.java | 8 ++++-
.../kie/kogito/persistence/jdbc/Repository.java | 6 +++-
.../db/ansi/V10.0.0__add_business_key_ansi.sql | 11 +++++++
.../V10.0.0__add_business_key_PostgreSQL.sql | 12 ++++++++
.../jdbc/AbstractProcessInstancesIT.java | 5 +--
.../org/kie/kogito/process/ProcessInstances.java | 6 +++-
7 files changed, 71 insertions(+), 13 deletions(-)
diff --git
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/GenericRepository.java
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/GenericRepository.java
index 58fc611b17..9dd066ee9f 100644
---
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/GenericRepository.java
+++
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/GenericRepository.java
@@ -34,16 +34,11 @@ import java.util.stream.StreamSupport;
import javax.sql.DataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class GenericRepository extends Repository {
private static final String PAYLOAD = "payload";
private static final String VERSION = "version";
- private static final Logger LOGGER =
LoggerFactory.getLogger(GenericRepository.class);
-
private final DataSource dataSource;
public GenericRepository(DataSource dataSource) {
@@ -51,17 +46,25 @@ public class GenericRepository extends Repository {
}
@Override
- void insertInternal(String processId, String processVersion, UUID id,
byte[] payload) {
+ void insertInternal(String processId, String processVersion, UUID id,
byte[] payload, String businessKey) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement =
connection.prepareStatement(INSERT)) {
- statement.setString(1, id.toString());
+ String processInstanceId = id.toString();
+ statement.setString(1, processInstanceId);
statement.setBytes(2, payload);
statement.setString(3, processId);
statement.setString(4, processVersion);
statement.setLong(5, 0L);
statement.executeUpdate();
+ if (businessKey != null) {
+ try (PreparedStatement businessKeyStmt =
connection.prepareStatement(INSERT_BUSINESS_KEY)) {
+ businessKeyStmt.setString(1, businessKey);
+ businessKeyStmt.setString(2, processInstanceId);
+ businessKeyStmt.executeUpdate();
+ }
+ }
} catch (Exception e) {
- throw uncheckedException(e, "Error inserting process instance %s",
id);
+ throw uncheckedException(e, "Error inserting process instance id:
%s, processId: %s processVersion: %s business key: %s", id, processId,
processVersion, businessKey);
}
}
@@ -140,6 +143,23 @@ public class GenericRepository extends Repository {
return Optional.empty();
}
+ @Override
+ Optional<Record> findByBusinessKey(String processId, String
processVersion, String businessKey) {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(sqlIncludingVersion(FIND_BY_BUSINESS_KEY,
processVersion))) {
+ statement.setString(1, businessKey);
+ statement.setString(2, processId);
+ if (processVersion != null) {
+ statement.setString(3, processVersion);
+ }
+ try (ResultSet resultSet = statement.executeQuery()) {
+ return resultSet.next() ? Optional.of(from(resultSet)) :
Optional.empty();
+ }
+ } catch (Exception e) {
+ throw uncheckedException(e, "Error finding process instance.
Business key: %s, Process Id: %s, Process version: %s", businessKey, processId,
processVersion);
+ }
+ }
+
private static class CloseableWrapper implements Runnable {
private Deque<AutoCloseable> wrapped = new ArrayDeque<>();
diff --git
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java
index 789da4f5c7..26c089fecf 100644
---
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java
+++
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java
@@ -60,7 +60,7 @@ public class JDBCProcessInstances implements
MutableProcessInstances {
public void create(String id, ProcessInstance instance) {
LOGGER.debug("Creating process instance id: {}, processId: {},
processVersion: {}", id, process.id(), process.version());
if (isActive(instance)) {
- repository.insertInternal(process.id(), process.version(),
UUID.fromString(id), marshaller.marshallProcessInstance(instance));
+ repository.insertInternal(process.id(), process.version(),
UUID.fromString(id), marshaller.marshallProcessInstance(instance),
instance.businessKey());
} else {
LOGGER.warn("Skipping create of process instance id: {}, state:
{}", id, instance.status());
}
@@ -101,6 +101,12 @@ public class JDBCProcessInstances implements
MutableProcessInstances {
return repository.findByIdInternal(process.id(), process.version(),
UUID.fromString(id)).map(r -> unmarshall(r, mode));
}
+ @Override
+ public Optional<ProcessInstance<?>> findByBusinessKey(String businessKey,
ProcessInstanceReadMode mode) {
+ LOGGER.debug("Find process instance using business Key : {}",
businessKey);
+ return repository.findByBusinessKey(process.id(), process.version(),
businessKey).map(r -> unmarshall(r, mode));
+ }
+
@Override
public Stream<ProcessInstance<?>> stream(ProcessInstanceReadMode mode) {
LOGGER.debug("Find process instance values using mode: {}", mode);
diff --git
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/Repository.java
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/Repository.java
index 69ac9bfe69..a1d1036766 100644
---
a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/Repository.java
+++
b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/Repository.java
@@ -25,8 +25,10 @@ import java.util.stream.Stream;
abstract class Repository {
static final String INSERT = "INSERT INTO process_instances (id, payload,
process_id, process_version, version) VALUES (?, ?, ?, ?, ?)";
+ static final String INSERT_BUSINESS_KEY = "INSERT INTO
business_key_mapping (business_key,process_instance_id) VALUES (?,?)";
static final String FIND_ALL = "SELECT payload, version FROM
process_instances WHERE process_id = ?";
static final String FIND_BY_ID = "SELECT payload, version FROM
process_instances WHERE process_id = ? and id = ?";
+ static final String FIND_BY_BUSINESS_KEY = "SELECT payload, version FROM
process_instances INNER JOIN business_key_mapping ON id = process_instance_id
WHERE business_key = ? and process_id = ?";
static final String UPDATE = "UPDATE process_instances SET payload = ?
WHERE process_id = ? and id = ?";
static final String UPDATE_WITH_LOCK = "UPDATE process_instances SET
payload = ?, version = ? WHERE process_id = ? and id = ? and version = ?";
static final String DELETE = "DELETE FROM process_instances WHERE
process_id = ? and id = ?";
@@ -51,7 +53,7 @@ abstract class Repository {
}
}
- abstract void insertInternal(String processId, String processVersion, UUID
id, byte[] payload);
+ abstract void insertInternal(String processId, String processVersion, UUID
id, byte[] payload, String businessKey);
abstract void updateInternal(String processId, String processVersion, UUID
id, byte[] payload);
@@ -61,6 +63,8 @@ abstract class Repository {
abstract Optional<Record> findByIdInternal(String processId, String
processVersion, UUID id);
+ abstract Optional<Record> findByBusinessKey(String processId, String
processVersion, String businessKey);
+
abstract Stream<Record> findAllInternal(String processId, String
processVersion);
protected RuntimeException uncheckedException(Exception ex, String
message, Object... param) {
diff --git
a/addons/common/persistence/jdbc/src/main/resources/db/ansi/V10.0.0__add_business_key_ansi.sql
b/addons/common/persistence/jdbc/src/main/resources/db/ansi/V10.0.0__add_business_key_ansi.sql
new file mode 100644
index 0000000000..dad119c2ad
--- /dev/null
+++
b/addons/common/persistence/jdbc/src/main/resources/db/ansi/V10.0.0__add_business_key_ansi.sql
@@ -0,0 +1,11 @@
+CREATE TABLE business_key_mapping (
+ business_key VARCHAR (255) NOT NULL,
+ process_instance_id VARCHAR (36) NOT NULL,
+ CONSTRAINT business_key_primary_key PRIMARY KEY (business_key),
+ CONSTRAINT fk_process_instances
+ FOREIGN KEY (process_instance_id)
+ REFERENCES process_instances(id)
+ ON DELETE CASCADE
+);
+
+CREATE INDEX idx_business_key_process_instance_id ON business_key_mapping
(process_instance_id);
diff --git
a/addons/common/persistence/jdbc/src/main/resources/db/postgresql/V10.0.0__add_business_key_PostgreSQL.sql
b/addons/common/persistence/jdbc/src/main/resources/db/postgresql/V10.0.0__add_business_key_PostgreSQL.sql
new file mode 100644
index 0000000000..4d04dad047
--- /dev/null
+++
b/addons/common/persistence/jdbc/src/main/resources/db/postgresql/V10.0.0__add_business_key_PostgreSQL.sql
@@ -0,0 +1,12 @@
+CREATE TABLE business_key_mapping (
+ business_key character (255) NOT NULL,
+ process_instance_id character (36) NOT NULL,
+ PRIMARY KEY (business_key),
+ CONSTRAINT fk_process_instances
+ FOREIGN KEY (process_instance_id)
+ REFERENCES process_instances(id)
+ ON DELETE CASCADE
+);
+
+
+CREATE INDEX idx_business_key_process_instance_id ON business_key_mapping
(process_instance_id);
diff --git
a/addons/common/persistence/jdbc/src/test/java/org/kie/persistence/jdbc/AbstractProcessInstancesIT.java
b/addons/common/persistence/jdbc/src/test/java/org/kie/persistence/jdbc/AbstractProcessInstancesIT.java
index 985bd61348..3769ce2329 100644
---
a/addons/common/persistence/jdbc/src/test/java/org/kie/persistence/jdbc/AbstractProcessInstancesIT.java
+++
b/addons/common/persistence/jdbc/src/test/java/org/kie/persistence/jdbc/AbstractProcessInstancesIT.java
@@ -152,13 +152,14 @@ abstract class AbstractProcessInstancesIT {
@Test
void testBasicFlow() {
var factory = new TestProcessInstancesFactory(getDataSource(), lock());
+ final String businessKey = "manolo";
BpmnProcess process = createProcess(factory, "BPMN2-UserTask.bpmn2");
- ProcessInstance<BpmnVariables> processInstance =
process.createInstance(BpmnVariables.create(singletonMap("test",
+ ProcessInstance<BpmnVariables> processInstance =
process.createInstance(businessKey, BpmnVariables.create(singletonMap("test",
"test")));
processInstance.start();
JDBCProcessInstances processInstances = (JDBCProcessInstances)
process.instances();
- Optional<?> foundOne = processInstances.findById(processInstance.id());
+ Optional<?> foundOne = processInstances.findByBusinessKey(businessKey);
BpmnProcessInstance instanceOne = (BpmnProcessInstance) foundOne.get();
processInstances.update(processInstance.id(), instanceOne);
diff --git
a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
index 6cc0d95649..a0ce2e8c42 100644
--- a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
+++ b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
@@ -30,7 +30,11 @@ public interface ProcessInstances<T> {
Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode
mode);
default Optional<ProcessInstance<T>> findByBusinessKey(String id) {
- return stream().filter(pi -> id.equals(pi.businessKey())).findAny();
+ return findByBusinessKey(id, ProcessInstanceReadMode.READ_ONLY);
+ }
+
+ default Optional<ProcessInstance<T>> findByBusinessKey(String id,
ProcessInstanceReadMode mode) {
+ return stream(mode).filter(pi ->
id.equals(pi.businessKey())).findAny();
}
Stream<ProcessInstance<T>> stream(ProcessInstanceReadMode mode);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]