This is an automated email from the ASF dual-hosted git repository.

pefernan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new df620186a incubator-kie-issues#458: Create JPA Storage for `Job 
Service` (#2094)
df620186a is described below

commit df620186a8ceee756f44d4134bfeb6e409e7a397
Author: Pere Fernández <[email protected]>
AuthorDate: Tue Sep 3 15:07:03 2024 +0200

    incubator-kie-issues#458: Create JPA Storage for `Job Service` (#2094)
    
    * incubator-kie-issues#458: Create JPA Storage for `Job Service`
    
    * fix formatting
---
 jobs-service/jobs-service-storage-jpa/pom.xml      | 122 +++++++++++
 .../repository/jpa/JPAReactiveJobRepository.java   | 226 +++++++++++++++++++++
 .../JPAReactiveJobServiceManagementRepository.java | 133 ++++++++++++
 .../jpa/converter/JsonBinaryConverter.java         |  50 +++++
 .../repository/jpa/model/JobDetailsEntity.java     | 188 +++++++++++++++++
 .../jpa/model/JobServiceManagementEntity.java      |  66 ++++++
 .../jpa/repository/JobDetailsEntityRepository.java |  30 +++
 .../JobServiceManagementEntityRepository.java      |  30 +++
 .../jpa/utils/ReactiveRepositoryHelper.java        |  51 +++++
 .../src/main/resources/META-INF/beans.xml          |  20 ++
 .../db/jobs-service/V2.0.0__Create_Tables.sql      |  46 +++++
 .../jpa/JPAReactiveJobRepositoryTest.java          |  48 +++++
 ...ReactiveJobServiceManagementRepositoryTest.java | 107 ++++++++++
 .../jobs/service/resource/JPAJobResourceTest.java  |  29 +++
 .../src/test/resources/application.properties      |  32 +++
 jobs-service/pom.xml                               |   1 +
 kogito-apps-bom/pom.xml                            |  11 +
 17 files changed, 1190 insertions(+)

diff --git a/jobs-service/jobs-service-storage-jpa/pom.xml 
b/jobs-service/jobs-service-storage-jpa/pom.xml
new file mode 100644
index 000000000..e756309a0
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.kie.kogito</groupId>
+        <artifactId>jobs-service</artifactId>
+        <version>999-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>jobs-service-storage-jpa</artifactId>
+    <name>Kogito Apps :: Jobs Service :: Storage :: JPA</name>
+    <description>Jobs Service (Timers and Async Jobs) JPA Storage</description>
+
+    <properties>
+        
<java.module.name>org.kie.kogito.job.service.repository.jpa</java.module.name>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.kie.kogito</groupId>
+            <artifactId>jobs-service-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.persistence</groupId>
+            <artifactId>jakarta.persistence-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.smallrye.reactive</groupId>
+            <artifactId>mutiny-zero-flow-adapters</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-hibernate-orm-panache</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-jdbc-h2</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-flyway</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.kie.kogito</groupId>
+            <artifactId>jobs-service-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-test-h2</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.kie.kogito</groupId>
+            <artifactId>kogito-quarkus-test-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-junit5</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.rest-assured</groupId>
+            <artifactId>rest-assured</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.keycloak</groupId>
+            <artifactId>keycloak-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.github.tomakehurst</groupId>
+            <artifactId>wiremock-jre8</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java
 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java
new file mode 100644
index 000000000..55f17d21d
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import java.time.OffsetDateTime;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
+
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
+import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.jpa.model.JobDetailsEntity;
+import 
org.kie.kogito.jobs.service.repository.jpa.repository.JobDetailsEntityRepository;
+import 
org.kie.kogito.jobs.service.repository.jpa.utils.ReactiveRepositoryHelper;
+import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller;
+import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
+import org.kie.kogito.jobs.service.utils.DateUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.quarkus.panache.common.Parameters;
+import io.quarkus.panache.common.Sort;
+import io.smallrye.mutiny.Multi;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import static java.time.OffsetDateTime.now;
+import static mutiny.zero.flow.adapters.AdaptersToReactiveStreams.publisher;
+import static org.kie.kogito.jobs.service.utils.DateUtil.DEFAULT_ZONE;
+
+@ApplicationScoped
+public class JPAReactiveJobRepository extends BaseReactiveJobRepository 
implements ReactiveJobRepository {
+
+    private static final String JOBS_BETWEEN_FIRE_TIMES_QUERY = "select job " +
+            "from JobDetailsEntity job " +
+            "where job.fireTime between :from and :to and job.status in 
:status";
+
+    private final JobDetailsEntityRepository repository;
+    private final ReactiveRepositoryHelper reactiveRepositoryHelper;
+
+    private final TriggerMarshaller triggerMarshaller;
+    private final RecipientMarshaller recipientMarshaller;
+
+    JPAReactiveJobRepository() {
+        this(null, null, null, null, null, null);
+    }
+
+    @Inject
+    public JPAReactiveJobRepository(Vertx vertx, JobEventPublisher 
jobEventPublisher, JobDetailsEntityRepository repository,
+            ReactiveRepositoryHelper reactiveRepositoryHelper,
+            TriggerMarshaller triggerMarshaller, RecipientMarshaller 
recipientMarshaller) {
+        super(vertx, jobEventPublisher);
+        this.repository = repository;
+        this.reactiveRepositoryHelper = reactiveRepositoryHelper;
+        this.triggerMarshaller = triggerMarshaller;
+        this.recipientMarshaller = recipientMarshaller;
+    }
+
+    @Override
+    public CompletionStage<JobDetails> doSave(JobDetails job) {
+        return this.reactiveRepositoryHelper.runAsync(() -> persist(job))
+                .thenApply(this::from);
+    }
+
+    private JobDetailsEntity persist(JobDetails job) {
+        JobDetailsEntity jobDetailsInstance = 
repository.findByIdOptional(job.getId()).orElseGet(JobDetailsEntity::new);
+
+        merge(job, jobDetailsInstance);
+
+        repository.persist(jobDetailsInstance);
+
+        return repository.findById(job.getId());
+    }
+
+    @Override
+    public CompletionStage<JobDetails> get(String id) {
+        return this.reactiveRepositoryHelper.runAsync(() -> 
repository.findById(id))
+                .thenApply(this::from);
+    }
+
+    @Override
+    public CompletionStage<Boolean> exists(String id) {
+        return this.reactiveRepositoryHelper.runAsync(() -> 
repository.findByIdOptional(id))
+                .thenApply(Optional::isPresent);
+    }
+
+    @Override
+    public CompletionStage<JobDetails> delete(String id) {
+        return this.reactiveRepositoryHelper.runAsync(() -> this.deleteJob(id))
+                .thenApply(this::from);
+
+    }
+
+    private JobDetailsEntity deleteJob(String id) {
+        JobDetailsEntity jobDetailsInstance = repository.findById(id);
+
+        if (Objects.isNull(jobDetailsInstance)) {
+            return null;
+        }
+
+        repository.delete(jobDetailsInstance);
+
+        return jobDetailsInstance;
+    }
+
+    String toColumName(SortTermField field) {
+        return switch (field) {
+            case FIRE_TIME -> "fireTime";
+            case CREATED -> "created";
+            case ID -> "id";
+            default -> throw new IllegalArgumentException("No colum name is 
defined for field: " + field);
+        };
+    }
+
+    @Override
+    public PublisherBuilder<JobDetails> findByStatusBetweenDates(ZonedDateTime 
fromFireTime,
+            ZonedDateTime toFireTime,
+            JobStatus[] status,
+            SortTerm[] orderBy) {
+
+        Parameters params = Parameters.with("from", 
fromFireTime.toOffsetDateTime())
+                .and("to", toFireTime.toOffsetDateTime())
+                .and("status", 
Arrays.stream(status).map(Enum::toString).toList());
+
+        Sort sort = Sort.empty();
+
+        Arrays.stream(orderBy).forEach(sortTerm -> {
+            String columnName = toColumName(sortTerm.getField());
+            sort.and(columnName, sortTerm.isAsc() ? Sort.Direction.Ascending : 
Sort.Direction.Descending);
+        });
+
+        return ReactiveStreams.fromPublisher(publisher(Multi.createFrom()
+                .completionStage(this.reactiveRepositoryHelper.runAsync(() -> 
repository.list(JOBS_BETWEEN_FIRE_TIMES_QUERY, sort, params.map())))
+                .flatMap(jobDetailsEntities -> 
Multi.createFrom().iterable(jobDetailsEntities))
+                .map(this::from)));
+
+    }
+
+    JobDetailsEntity merge(JobDetails job, JobDetailsEntity instance) {
+        if (Objects.isNull(instance)) {
+            instance = new JobDetailsEntity();
+        }
+
+        ObjectMapper mapper = ObjectMapperFactory.get();
+
+        OffsetDateTime lastUpdate = now().truncatedTo(ChronoUnit.MILLIS);
+
+        instance.setId(job.getId());
+        instance.setCorrelationId(job.getCorrelationId());
+        instance.setStatus(mapOptionalValue(job.getStatus(), Enum::name));
+        instance.setLastUpdate(lastUpdate);
+        instance.setRetries(job.getRetries());
+        instance.setExecutionCounter(job.getExecutionCounter());
+        instance.setScheduledId(job.getScheduledId());
+        instance.setPriority(job.getPriority());
+
+        instance.setRecipient(mapOptionalValue(job.getRecipient(), recipient 
-> mapper.valueToTree(recipientMarshaller.marshall(recipient).getMap())));
+        instance.setTrigger(mapOptionalValue(job.getTrigger(), trigger -> 
mapper.valueToTree(triggerMarshaller.marshall(job.getTrigger()).getMap())));
+        
instance.setFireTime(mapOptionalValue(job.getTrigger().hasNextFireTime(), 
DateUtil::dateToOffsetDateTime));
+
+        instance.setExecutionTimeout(job.getExecutionTimeout());
+        
instance.setExecutionTimeoutUnit(mapOptionalValue(job.getExecutionTimeoutUnit(),
 Enum::name));
+
+        
instance.setCreated(Optional.ofNullable(job.getCreated()).map(ZonedDateTime::toOffsetDateTime).orElse(lastUpdate));
+
+        return instance;
+    }
+
+    JobDetails from(JobDetailsEntity instance) {
+        if (instance == null) {
+            return null;
+        }
+
+        return JobDetails.builder()
+                .id(instance.getId())
+                .correlationId(instance.getCorrelationId())
+                .status(mapOptionalValue(instance.getStatus(), 
JobStatus::valueOf))
+                
.lastUpdate(instance.getLastUpdate().atZoneSameInstant(DEFAULT_ZONE))
+                .retries(instance.getRetries())
+                .executionCounter(instance.getExecutionCounter())
+                .scheduledId(instance.getScheduledId())
+                .priority(instance.getPriority())
+                .recipient(mapOptionalValue(instance.getRecipient(), recipient 
-> recipientMarshaller.unmarshall(JsonObject.mapFrom(recipient))))
+                .trigger(mapOptionalValue(instance.getTrigger(), trigger -> 
triggerMarshaller.unmarshall(JsonObject.mapFrom(trigger))))
+                .executionTimeout(instance.getExecutionTimeout())
+                
.executionTimeoutUnit(mapOptionalValue(instance.getExecutionTimeoutUnit(), 
ChronoUnit::valueOf))
+                .created(instance.getCreated().atZoneSameInstant(DEFAULT_ZONE))
+                .build();
+    }
+
+    private <T, R> R mapOptionalValue(T object, Function<T, R> mapper) {
+        return Optional.ofNullable(object)
+                .map(mapper)
+                .orElse(null);
+    }
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java
 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java
new file mode 100644
index 000000000..0634dc844
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
+import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
+import 
org.kie.kogito.jobs.service.repository.jpa.model.JobServiceManagementEntity;
+import 
org.kie.kogito.jobs.service.repository.jpa.repository.JobServiceManagementEntityRepository;
+import 
org.kie.kogito.jobs.service.repository.jpa.utils.ReactiveRepositoryHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.quarkus.panache.common.Parameters;
+import io.smallrye.mutiny.Uni;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import static java.time.OffsetDateTime.now;
+
+@ApplicationScoped
+public class JPAReactiveJobServiceManagementRepository implements 
JobServiceManagementRepository {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JPAReactiveJobServiceManagementRepository.class);
+
+    private final JobServiceManagementEntityRepository repository;
+    private final ReactiveRepositoryHelper reactiveRepositoryHelper;
+
+    @Inject
+    public 
JPAReactiveJobServiceManagementRepository(JobServiceManagementEntityRepository 
repository,
+            ReactiveRepositoryHelper reactiveRepositoryHelper) {
+        this.repository = repository;
+        this.reactiveRepositoryHelper = reactiveRepositoryHelper;
+    }
+
+    @Override
+    public Uni<JobServiceManagementInfo> getAndUpdate(String id, 
Function<JobServiceManagementInfo, JobServiceManagementInfo> computeUpdate) {
+        LOGGER.info("get {}", id);
+        return Uni.createFrom()
+                .completionStage(this.reactiveRepositoryHelper.runAsync(() -> 
doGetAndUpdate(id, computeUpdate)))
+                .onItem().ifNotNull().invoke(info -> LOGGER.trace("got {}", 
info));
+    }
+
+    private JobServiceManagementInfo doGetAndUpdate(String id, 
Function<JobServiceManagementInfo, JobServiceManagementInfo> computeUpdate) {
+
+        JobServiceManagementInfo info = this.repository.findByIdOptional(id)
+                .map(this::from)
+                .orElse(null);
+
+        return this.update(computeUpdate.apply(info));
+    }
+
+    @Override
+    public Uni<JobServiceManagementInfo> set(JobServiceManagementInfo info) {
+        LOGGER.info("set {}", info);
+        return 
Uni.createFrom().completionStage(this.reactiveRepositoryHelper.runAsync(() -> 
this.doSet(info)));
+    }
+
+    public JobServiceManagementInfo doSet(JobServiceManagementInfo info) {
+        return this.update(info);
+    }
+
+    private JobServiceManagementInfo update(JobServiceManagementInfo info) {
+
+        if (Objects.isNull(info)) {
+            return null;
+        }
+
+        JobServiceManagementEntity jobService = 
this.repository.findByIdOptional(info.getId()).orElse(new 
JobServiceManagementEntity());
+
+        jobService.setId(info.getId());
+        jobService.setToken(info.getToken());
+        jobService.setLastHeartBeat(info.getLastHeartbeat());
+
+        repository.persist(jobService);
+
+        return from(jobService);
+    }
+
+    @Override
+    public Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo 
info) {
+        return 
Uni.createFrom().completionStage(this.reactiveRepositoryHelper.runAsync(() -> 
this.doHeartbeat(info)));
+    }
+
+    private JobServiceManagementEntity findById(String id) {
+        return repository.findById(id);
+    }
+
+    private JobServiceManagementEntity 
findByIdAndToken(JobServiceManagementInfo info) {
+        return 
repository.find("#JobServiceManagementEntity.GetServiceByIdAndToken", 
Parameters.with("id", info.getId()).and("token", info.getToken()).map())
+                .firstResultOptional().orElse(null);
+    }
+
+    private JobServiceManagementInfo doHeartbeat(JobServiceManagementInfo 
info) {
+        JobServiceManagementEntity jobService = findByIdAndToken(info);
+
+        if (jobService == null) {
+            return null;
+        }
+
+        jobService.setLastHeartBeat(now());
+        repository.persist(jobService);
+
+        return from(jobService);
+    }
+
+    JobServiceManagementInfo from(JobServiceManagementEntity jobService) {
+        if (Objects.isNull(jobService)) {
+            return null;
+        }
+        return new JobServiceManagementInfo(jobService.getId(), 
jobService.getToken(), jobService.getLastHeartBeat());
+    }
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java
 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java
new file mode 100644
index 000000000..b269ab9ea
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa.converter;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import jakarta.persistence.AttributeConverter;
+
+public class JsonBinaryConverter implements AttributeConverter<ObjectNode, 
byte[]> {
+
+    @Override
+    public byte[] convertToDatabaseColumn(ObjectNode attribute) {
+        try {
+            return attribute == null ? null : 
ObjectMapperFactory.get().writeValueAsBytes(attribute);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @Override
+    public ObjectNode convertToEntityAttribute(byte[] dbData) {
+        try {
+            return dbData == null ? null : 
ObjectMapperFactory.get().readValue(dbData, ObjectNode.class);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java
 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java
new file mode 100644
index 000000000..9c1b317b4
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa.model;
+
+import java.time.OffsetDateTime;
+
+import 
org.kie.kogito.jobs.service.repository.jpa.converter.JsonBinaryConverter;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import jakarta.persistence.*;
+
+@Entity
+@Table(name = "job_details",
+        indexes = {
+                @Index(name = "job_details_fire_time_idx", columnList = 
"fire_time"),
+                @Index(name = "job_details_created_idx", columnList = 
"created")
+        })
+public class JobDetailsEntity {
+
+    @Id
+    private String id;
+
+    @Column(name = "correlation_id")
+    private String correlationId;
+
+    private String status;
+
+    @Column(name = "last_update")
+    @Temporal(TemporalType.TIMESTAMP)
+    private OffsetDateTime lastUpdate;
+
+    private Integer retries;
+
+    @Column(name = "execution_counter")
+    private Integer executionCounter;
+
+    @Column(name = "scheduled_id")
+    private String scheduledId;
+
+    private Integer priority;
+
+    @Convert(converter = JsonBinaryConverter.class)
+    private ObjectNode recipient;
+
+    @Convert(converter = JsonBinaryConverter.class)
+    private ObjectNode trigger;
+
+    @Column(name = "fire_time")
+    @Temporal(TemporalType.TIMESTAMP)
+    private OffsetDateTime fireTime;
+
+    @Column(name = "execution_timeout")
+    private Long executionTimeout;
+    @Column(name = "execution_timeout_unit")
+    private String executionTimeoutUnit;
+
+    @Temporal(TemporalType.TIMESTAMP)
+    private OffsetDateTime created;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    public void setCorrelationId(String correlationId) {
+        this.correlationId = correlationId;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+    public OffsetDateTime getLastUpdate() {
+        return lastUpdate;
+    }
+
+    public void setLastUpdate(OffsetDateTime lastUpdate) {
+        this.lastUpdate = lastUpdate;
+    }
+
+    public Integer getRetries() {
+        return retries;
+    }
+
+    public void setRetries(Integer retries) {
+        this.retries = retries;
+    }
+
+    public Integer getExecutionCounter() {
+        return executionCounter;
+    }
+
+    public void setExecutionCounter(Integer executionCounter) {
+        this.executionCounter = executionCounter;
+    }
+
+    public String getScheduledId() {
+        return scheduledId;
+    }
+
+    public void setScheduledId(String scheduledId) {
+        this.scheduledId = scheduledId;
+    }
+
+    public Integer getPriority() {
+        return priority;
+    }
+
+    public void setPriority(Integer priority) {
+        this.priority = priority;
+    }
+
+    public ObjectNode getRecipient() {
+        return recipient;
+    }
+
+    public void setRecipient(ObjectNode recipient) {
+        this.recipient = recipient;
+    }
+
+    public ObjectNode getTrigger() {
+        return trigger;
+    }
+
+    public void setTrigger(ObjectNode trigger) {
+        this.trigger = trigger;
+    }
+
+    public OffsetDateTime getFireTime() {
+        return fireTime;
+    }
+
+    public void setFireTime(OffsetDateTime fireTime) {
+        this.fireTime = fireTime;
+    }
+
+    public Long getExecutionTimeout() {
+        return executionTimeout;
+    }
+
+    public void setExecutionTimeout(Long executionTimeout) {
+        this.executionTimeout = executionTimeout;
+    }
+
+    public String getExecutionTimeoutUnit() {
+        return executionTimeoutUnit;
+    }
+
+    public void setExecutionTimeoutUnit(String executionTimeoutUnit) {
+        this.executionTimeoutUnit = executionTimeoutUnit;
+    }
+
+    public OffsetDateTime getCreated() {
+        return created;
+    }
+
+    public void setCreated(OffsetDateTime created) {
+        this.created = created;
+    }
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java
 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java
new file mode 100644
index 000000000..77ac0a67a
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.model;
+
+import java.time.OffsetDateTime;
+
+import jakarta.persistence.*;
+
+@Entity
+@NamedQuery(name = "JobServiceManagementEntity.GetServiceByIdAndToken",
+        query = "select service " +
+                "from JobServiceManagementEntity service " +
+                "where service.id = :id and service.token = :token")
+@Table(name = "job_service_management")
+public class JobServiceManagementEntity {
+
+    @Id
+    private String id;
+
+    @Column(name = "last_heartbeat")
+    @Temporal(TemporalType.TIMESTAMP)
+    private OffsetDateTime lastHeartBeat;
+
+    private String token;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public OffsetDateTime getLastHeartBeat() {
+        return lastHeartBeat;
+    }
+
+    public void setLastHeartBeat(OffsetDateTime lastHeartBeat) {
+        this.lastHeartBeat = lastHeartBeat;
+    }
+
+    public String getToken() {
+        return token;
+    }
+
+    public void setToken(String token) {
+        this.token = token;
+    }
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java
 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java
new file mode 100644
index 000000000..dd1003f63
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.repository;
+
+import org.kie.kogito.jobs.service.repository.jpa.model.JobDetailsEntity;
+
+import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class JobDetailsEntityRepository implements 
PanacheRepositoryBase<JobDetailsEntity, String> {
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java
 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java
new file mode 100644
index 000000000..1a528ccb2
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.repository;
+
+import 
org.kie.kogito.jobs.service.repository.jpa.model.JobServiceManagementEntity;
+
+import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class JobServiceManagementEntityRepository implements 
PanacheRepositoryBase<JobServiceManagementEntity, String> {
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java
 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java
new file mode 100644
index 000000000..9dab31a5b
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.utils;
+
+import java.util.concurrent.CompletionStage;
+import java.util.function.Supplier;
+
+import io.quarkus.narayana.jta.QuarkusTransaction;
+import io.quarkus.narayana.jta.TransactionRunnerOptions;
+import io.vertx.core.Vertx;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+@ApplicationScoped
+public class ReactiveRepositoryHelper {
+
+    private Vertx vertx;
+
+    @Inject
+    public ReactiveRepositoryHelper(Vertx vertx) {
+        this.vertx = vertx;
+    }
+
+    public <T> CompletionStage<T> runAsync(Supplier<T> blockingFunction) {
+        return vertx.executeBlocking(() -> 
wrapInTransaction(blockingFunction)).toCompletionStage();
+    }
+
+    private <T> T wrapInTransaction(Supplier<T> function) {
+        TransactionRunnerOptions runner = QuarkusTransaction.isActive() ? 
QuarkusTransaction.joiningExisting() : QuarkusTransaction.requiringNew();
+
+        return runner.call(function::get);
+    }
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml 
b/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a0eb9fbf8
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,20 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql
 
b/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql
new file mode 100644
index 000000000..b48a32a95
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+create table job_details
+(
+    id                     varchar(50) primary key,
+    correlation_id         varchar(50),
+    status                 varchar(40),
+    last_update            timestamp,
+    retries                integer,
+    execution_counter      integer,
+    scheduled_id           varchar(40),
+    priority               integer,
+    recipient              varbinary(max),
+    trigger                varbinary(max),
+    fire_time              timestamp,
+    execution_timeout      bigint,
+    execution_timeout_unit varchar(40),
+    created                timestamp
+);
+
+create index job_details_fire_time_idx on job_details (fire_time);
+create index job_details_created_idx on job_details (created);
+
+CREATE TABLE job_service_management
+(
+    id             varchar(40) primary key,
+    last_heartbeat timestamp,
+    token          varchar(40) unique
+);
\ No newline at end of file
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java
 
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java
new file mode 100644
index 000000000..d1fe3cafe
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.impl.BaseJobRepositoryTest;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+import jakarta.inject.Inject;
+
+@QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+public class JPAReactiveJobRepositoryTest extends BaseJobRepositoryTest {
+
+    @Inject
+    JPAReactiveJobRepository tested;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+
+        super.setUp();
+    }
+
+    @Override
+    public ReactiveJobRepository tested() {
+        return tested;
+    }
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java
 
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java
new file mode 100644
index 000000000..a0a18cab4
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import java.time.OffsetDateTime;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
+import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
+import org.kie.kogito.jobs.service.utils.DateUtil;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+import jakarta.inject.Inject;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+class JPAReactiveJobServiceManagementRepositoryTest {
+
+    @Inject
+    JobServiceManagementRepository tested;
+
+    @BeforeEach
+    void setUp() {
+    }
+
+    @Test
+    void testGetAndUpdate() {
+        String id = "instance-id-1";
+        String token = "token1";
+        create(id, token);
+
+        AtomicReference<OffsetDateTime> date = new AtomicReference<>();
+        JobServiceManagementInfo updated = tested.getAndUpdate(id, info -> {
+            date.set(DateUtil.now().toOffsetDateTime());
+            info.setLastHeartbeat(date.get());
+            return info;
+        }).await().indefinitely();
+        assertThat(updated.getId()).isEqualTo(id);
+        assertThat(date.get()).isNotNull();
+        assertThat(updated.getLastHeartbeat()).isEqualTo(date.get());
+        assertThat(updated.getToken()).isEqualTo(token);
+    }
+
+    @Test
+    void testGetAndUpdateNotExisting() {
+        String id = "instance-id-2";
+        AtomicReference<JobServiceManagementInfo> found = new 
AtomicReference<>(new JobServiceManagementInfo());
+        JobServiceManagementInfo updated = tested.getAndUpdate(id, info -> {
+            found.set(info);
+            return info;
+        }).await().indefinitely();
+        assertThat(updated).isNull();
+        assertThat(found.get()).isNull();
+    }
+
+    private JobServiceManagementInfo create(String id, String token) {
+        JobServiceManagementInfo created = tested.set(new 
JobServiceManagementInfo(id, token, null)).await().indefinitely();
+        assertThat(created.getId()).isEqualTo(id);
+        assertThat(created.getToken()).isEqualTo(token);
+        assertThat(created.getLastHeartbeat()).isNull();
+        return created;
+    }
+
+    @Test
+    void testHeartbeat() {
+        String id = "instance-id-3";
+        String token = "token3";
+        JobServiceManagementInfo created = create(id, token);
+
+        JobServiceManagementInfo updated = 
tested.heartbeat(created).await().indefinitely();
+        assertThat(updated.getLastHeartbeat()).isNotNull();
+        
assertThat(updated.getLastHeartbeat()).isBefore(DateUtil.now().plusSeconds(1).toOffsetDateTime());
+    }
+
+    @Test
+    void testConflictHeartbeat() {
+        String id = "instance-id-4";
+        String token = "token4";
+        create(id, token);
+
+        JobServiceManagementInfo updated = tested.heartbeat(new 
JobServiceManagementInfo(id, "differentToken", null)).await().indefinitely();
+        assertThat(updated).isNull();
+    }
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java
 
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java
new file mode 100644
index 000000000..0fdc67d71
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.resource;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+public class JPAJobResourceTest extends BaseJobResourceTest {
+
+}
diff --git 
a/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties
 
b/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties
new file mode 100644
index 000000000..ef10d9621
--- /dev/null
+++ 
b/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Kogito
+kogito.apps.persistence.type=jdbc
+# Data source
+quarkus.datasource.db-kind=h2
+quarkus.datasource.username=kogito
+quarkus.datasource.jdbc.url=jdbc:h2:mem:default;NON_KEYWORDS=VALUE,KEY
+quarkus.flyway.migrate-at-start=true
+quarkus.flyway.clean-at-start=true
+quarkus.flyway.locations=db/jobs-service
+# Disabling Security for tests
+quarkus.oidc.enabled=false
+quarkus.oidc.tenant-enabled=false
+quarkus.oidc.auth-server-url=none
+quarkus.keycloak.devservices.enabled=false
\ No newline at end of file
diff --git a/jobs-service/pom.xml b/jobs-service/pom.xml
index df5f21f77..8eb5f0ff2 100644
--- a/jobs-service/pom.xml
+++ b/jobs-service/pom.xml
@@ -39,6 +39,7 @@
     <module>jobs-recipients</module>
     <module>jobs-service-common</module>
     <module>jobs-service-postgresql-common</module>
+    <module>jobs-service-storage-jpa</module>
     <module>jobs-service-postgresql</module>
     <module>jobs-service-inmemory</module>
     <module>kogito-addons-jobs-service</module>
diff --git a/kogito-apps-bom/pom.xml b/kogito-apps-bom/pom.xml
index 5e903d497..774cd4696 100644
--- a/kogito-apps-bom/pom.xml
+++ b/kogito-apps-bom/pom.xml
@@ -78,6 +78,17 @@
                 <version>${project.version}</version>
                 <classifier>sources</classifier>
             </dependency>
+            <dependency>
+                <groupId>org.kie.kogito</groupId>
+                <artifactId>jobs-service-storage-jpa</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.kie.kogito</groupId>
+                <artifactId>jobs-service-storage-jpa</artifactId>
+                <version>${project.version}</version>
+                <classifier>sources</classifier>
+            </dependency>
             <dependency>
                 <groupId>org.kie.kogito</groupId>
                 <artifactId>jobs-service-postgresql</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to