This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new c82ab79a56 Add correlation persistence for MongoDB (#3612)
c82ab79a56 is described below

commit c82ab79a5693634b6e549fd4420dffba43735b49
Author: Matheus Cruz <[email protected]>
AuthorDate: Fri Aug 16 15:51:36 2024 -0300

    Add correlation persistence for MongoDB (#3612)
    
    * Add MongoDB support for correlation
    
    * Apply pull request suggestions
---
 .../correlation/MongoDBCorrelationRepository.java  | 109 +++++++++++++++++++
 .../correlation/MongoDBCorrelationService.java     |  61 +++++++++++
 .../correlation/MongoDBCorrelationServiceIT.java   | 119 +++++++++++++++++++++
 .../quarkus/MongoDBCorrelationServiceProducer.java |  37 +++++++
 4 files changed, 326 insertions(+)

diff --git 
a/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationRepository.java
 
b/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationRepository.java
new file mode 100644
index 0000000000..0aa053a4cc
--- /dev/null
+++ 
b/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationRepository.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.mongodb.correlation;
+
+import java.io.UncheckedIOException;
+import java.util.Map;
+
+import org.bson.Document;
+import org.bson.codecs.configuration.CodecRegistries;
+import org.bson.codecs.configuration.CodecRegistry;
+import org.bson.conversions.Bson;
+import org.kie.kogito.correlation.CompositeCorrelation;
+import org.kie.kogito.correlation.Correlation;
+import org.kie.kogito.correlation.CorrelationInstance;
+import org.kie.kogito.correlation.SimpleCorrelation;
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.result.InsertOneResult;
+
+public class MongoDBCorrelationRepository {
+
+    private final MongoCollection<Document> collection;
+    private final ObjectMapper objectMapper;
+
+    private static final String ENCODED_CORRELATION_ID_FIELD = 
"encodedCorrelationId";
+    private static final String CORRELATED_ID_FIELD = "correlatedId";
+    private static final String CORRELATION_FIELD = "correlation";
+    private static final String CORRELATION_COLLECTION_NAME = "correlations";
+
+    public MongoDBCorrelationRepository(MongoClient mongoClient, String 
dbName) {
+        CodecRegistry registry = 
CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry());
+        this.collection = 
mongoClient.getDatabase(dbName).getCollection(CORRELATION_COLLECTION_NAME).withCodecRegistry(registry);
+        SimpleModule simpleModule = new SimpleModule();
+        simpleModule.addAbstractTypeMapping(Correlation.class, 
SimpleCorrelation.class);
+        this.objectMapper = 
ObjectMapperFactory.get().copy().registerModule(simpleModule);
+    }
+
+    public CorrelationInstance insert(final String encodedCorrelationId, final 
String correlatedId, final Correlation correlation) {
+
+        CorrelationInstance correlationInstance = new 
CorrelationInstance(encodedCorrelationId, correlatedId, correlation);
+        try {
+            Map<String, Object> object = Map.of(
+                    ENCODED_CORRELATION_ID_FIELD, encodedCorrelationId,
+                    CORRELATED_ID_FIELD, correlatedId,
+                    CORRELATION_FIELD, correlation);
+            String json = this.objectMapper.writeValueAsString(object);
+            InsertOneResult insertOneResult = 
this.collection.insertOne(Document.parse(json));
+            return insertOneResult.getInsertedId() != null ? 
correlationInstance : null;
+        } catch (JsonProcessingException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    public CorrelationInstance findByEncodedCorrelationId(String encoded) {
+        Bson eq = Filters.eq(ENCODED_CORRELATION_ID_FIELD, encoded);
+        return getCorrelationInstanceByFilter(eq);
+    }
+
+    public CorrelationInstance findByCorrelatedId(String correlatedId) {
+        Bson eq = Filters.eq(CORRELATED_ID_FIELD, correlatedId);
+        return getCorrelationInstanceByFilter(eq);
+    }
+
+    private CorrelationInstance getCorrelationInstanceByFilter(Bson eq) {
+        Document first = this.collection.find(eq).first();
+        if (first == null) {
+            return null;
+        } else {
+            Document document = first.get(CORRELATION_FIELD, Document.class);
+            try {
+                CompositeCorrelation compositeCorrelation = 
this.objectMapper.readValue(document.toJson(), CompositeCorrelation.class);
+                return new CorrelationInstance(
+                        first.getString(ENCODED_CORRELATION_ID_FIELD),
+                        first.getString(CORRELATED_ID_FIELD),
+                        compositeCorrelation);
+            } catch (JsonProcessingException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+    }
+
+    public void delete(String encoded) {
+        Bson eq = Filters.eq(ENCODED_CORRELATION_ID_FIELD, encoded);
+        this.collection.deleteOne(eq);
+    }
+}
diff --git 
a/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationService.java
 
b/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationService.java
new file mode 100644
index 0000000000..f3fe19ae49
--- /dev/null
+++ 
b/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.mongodb.correlation;
+
+import java.util.Optional;
+
+import org.kie.kogito.correlation.Correlation;
+import org.kie.kogito.correlation.CorrelationEncoder;
+import org.kie.kogito.correlation.CorrelationInstance;
+import org.kie.kogito.correlation.CorrelationService;
+import org.kie.kogito.event.correlation.MD5CorrelationEncoder;
+
+public class MongoDBCorrelationService implements CorrelationService {
+
+    private final MongoDBCorrelationRepository correlationRepository;
+    private final CorrelationEncoder correlationEncoder;
+
+    public MongoDBCorrelationService(MongoDBCorrelationRepository 
correlationRepository) {
+        this.correlationRepository = correlationRepository;
+        this.correlationEncoder = new MD5CorrelationEncoder();
+    }
+
+    @Override
+    public CorrelationInstance create(Correlation correlation, String 
correlatedId) {
+        String encodedCorrelationId = 
this.correlationEncoder.encode(correlation);
+        return this.correlationRepository.insert(encodedCorrelationId, 
correlatedId, correlation);
+    }
+
+    @Override
+    public Optional<CorrelationInstance> find(Correlation correlation) {
+        String encodedCorrelationId = correlationEncoder.encode(correlation);
+        return 
Optional.ofNullable(this.correlationRepository.findByEncodedCorrelationId(encodedCorrelationId));
+    }
+
+    @Override
+    public Optional<CorrelationInstance> findByCorrelatedId(String 
correlatedId) {
+        return 
Optional.ofNullable(this.correlationRepository.findByCorrelatedId(correlatedId));
+    }
+
+    @Override
+    public void delete(Correlation correlation) {
+        String encodedCorrelationId = correlationEncoder.encode(correlation);
+        this.correlationRepository.delete(encodedCorrelationId);
+    }
+}
diff --git 
a/addons/common/persistence/mongodb/src/test/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationServiceIT.java
 
b/addons/common/persistence/mongodb/src/test/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationServiceIT.java
new file mode 100644
index 0000000000..f3abeaf61b
--- /dev/null
+++ 
b/addons/common/persistence/mongodb/src/test/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationServiceIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.mongodb.correlation;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.correlation.CompositeCorrelation;
+import org.kie.kogito.correlation.CorrelationInstance;
+import org.kie.kogito.correlation.SimpleCorrelation;
+import org.kie.kogito.testcontainers.KogitoMongoDBContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Testcontainers
+class MongoDBCorrelationServiceIT {
+
+    @Container
+    final static KogitoMongoDBContainer mongoDBContainer = new 
KogitoMongoDBContainer();
+    private static MongoDBCorrelationService correlationService;
+    private static MongoClient mongoClient;
+    private static final String DB_NAME = "test";
+    private static final String COLLECTION_NAME = "correlations";
+
+    @BeforeAll
+    static void setUp() {
+        mongoDBContainer.start();
+        mongoClient = MongoClients.create(mongoDBContainer.getReplicaSetUrl());
+        correlationService = new MongoDBCorrelationService(new 
MongoDBCorrelationRepository(
+                mongoClient, DB_NAME));
+    }
+
+    @BeforeEach
+    void beforeEach() {
+        mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).drop();
+    }
+
+    @Test
+    void shouldSaveCorrelation() {
+        // arrange
+        String correlatedId = "id";
+        CompositeCorrelation correlation = new 
CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "Rio 
de Janeiro")));
+
+        // act
+        correlationService.create(correlation, correlatedId);
+
+        // assert
+        Optional<CorrelationInstance> byCorrelatedId = 
correlationService.findByCorrelatedId(correlatedId);
+        assertThat(byCorrelatedId).isNotEmpty();
+    }
+
+    @Test
+    void shouldDeleteCorrelation() {
+        // arrange
+        String correlatedId = "id";
+        CompositeCorrelation correlation = new 
CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "São 
Paulo")));
+        correlationService.create(correlation, correlatedId);
+
+        // act
+        correlationService.delete(correlation);
+
+        // assert
+        
assertThat(correlationService.findByCorrelatedId(correlatedId)).isEmpty();
+    }
+
+    @Test
+    void shouldFindByCorrelatedId() {
+        // arrange
+        String correlatedId = "id";
+        CompositeCorrelation correlation = new 
CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", 
"Goiânia")));
+        correlationService.create(correlation, correlatedId);
+
+        // act
+        Optional<CorrelationInstance> byCorrelatedId = 
correlationService.findByCorrelatedId(correlatedId);
+
+        // assert
+        assertThat(byCorrelatedId).isNotEmpty();
+    }
+
+    @Test
+    void shouldFindByCorrelation() {
+        // arrange
+        CompositeCorrelation correlation = new 
CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", 
"Osasco")));
+        String correlatedId = "id";
+
+        correlationService.create(correlation, correlatedId);
+
+        // act
+        Optional<CorrelationInstance> correlationInstance = 
correlationService.find(correlation);
+
+        // assert
+        assertThat(correlationInstance).isNotEmpty();
+    }
+
+}
diff --git 
a/quarkus/addons/persistence/mongodb/runtime/src/main/java/org/kie/kogito/persistence/quarkus/MongoDBCorrelationServiceProducer.java
 
b/quarkus/addons/persistence/mongodb/runtime/src/main/java/org/kie/kogito/persistence/quarkus/MongoDBCorrelationServiceProducer.java
new file mode 100644
index 0000000000..4ec1308607
--- /dev/null
+++ 
b/quarkus/addons/persistence/mongodb/runtime/src/main/java/org/kie/kogito/persistence/quarkus/MongoDBCorrelationServiceProducer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.persistence.quarkus;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.kie.kogito.mongodb.correlation.MongoDBCorrelationRepository;
+import org.kie.kogito.mongodb.correlation.MongoDBCorrelationService;
+
+import com.mongodb.client.MongoClient;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+
+@ApplicationScoped
+public class MongoDBCorrelationServiceProducer {
+
+    @Produces
+    public MongoDBCorrelationService getMongoDBCorrelationService(MongoClient 
mongoClient, @ConfigProperty(name = "quarkus.mongodb.database", defaultValue = 
"kogito") String dbName) {
+        return new MongoDBCorrelationService(new 
MongoDBCorrelationRepository(mongoClient, dbName));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to