This is an automated email from the ASF dual-hosted git repository.
pefernan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new df620186a incubator-kie-issues#458: Create JPA Storage for `Job
Service` (#2094)
df620186a is described below
commit df620186a8ceee756f44d4134bfeb6e409e7a397
Author: Pere Fernández <[email protected]>
AuthorDate: Tue Sep 3 15:07:03 2024 +0200
incubator-kie-issues#458: Create JPA Storage for `Job Service` (#2094)
* incubator-kie-issues#458: Create JPA Storage for `Job Service`
* fix formatting
---
jobs-service/jobs-service-storage-jpa/pom.xml | 122 +++++++++++
.../repository/jpa/JPAReactiveJobRepository.java | 226 +++++++++++++++++++++
.../JPAReactiveJobServiceManagementRepository.java | 133 ++++++++++++
.../jpa/converter/JsonBinaryConverter.java | 50 +++++
.../repository/jpa/model/JobDetailsEntity.java | 188 +++++++++++++++++
.../jpa/model/JobServiceManagementEntity.java | 66 ++++++
.../jpa/repository/JobDetailsEntityRepository.java | 30 +++
.../JobServiceManagementEntityRepository.java | 30 +++
.../jpa/utils/ReactiveRepositoryHelper.java | 51 +++++
.../src/main/resources/META-INF/beans.xml | 20 ++
.../db/jobs-service/V2.0.0__Create_Tables.sql | 46 +++++
.../jpa/JPAReactiveJobRepositoryTest.java | 48 +++++
...ReactiveJobServiceManagementRepositoryTest.java | 107 ++++++++++
.../jobs/service/resource/JPAJobResourceTest.java | 29 +++
.../src/test/resources/application.properties | 32 +++
jobs-service/pom.xml | 1 +
kogito-apps-bom/pom.xml | 11 +
17 files changed, 1190 insertions(+)
diff --git a/jobs-service/jobs-service-storage-jpa/pom.xml
b/jobs-service/jobs-service-storage-jpa/pom.xml
new file mode 100644
index 000000000..e756309a0
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>jobs-service</artifactId>
+ <version>999-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>jobs-service-storage-jpa</artifactId>
+ <name>Kogito Apps :: Jobs Service :: Storage :: JPA</name>
+ <description>Jobs Service (Timers and Async Jobs) JPA Storage</description>
+
+ <properties>
+
<java.module.name>org.kie.kogito.job.service.repository.jpa</java.module.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>jobs-service-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.persistence</groupId>
+ <artifactId>jakarta.persistence-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.smallrye.reactive</groupId>
+ <artifactId>mutiny-zero-flow-adapters</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-hibernate-orm-panache</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-jdbc-h2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-flyway</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>jobs-service-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-test-h2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-quarkus-test-utils</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-junit5</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.rest-assured</groupId>
+ <artifactId>rest-assured</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.keycloak</groupId>
+ <artifactId>keycloak-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.tomakehurst</groupId>
+ <artifactId>wiremock-jre8</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java
new file mode 100644
index 000000000..55f17d21d
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import java.time.OffsetDateTime;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
+
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
+import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.jpa.model.JobDetailsEntity;
+import
org.kie.kogito.jobs.service.repository.jpa.repository.JobDetailsEntityRepository;
+import
org.kie.kogito.jobs.service.repository.jpa.utils.ReactiveRepositoryHelper;
+import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller;
+import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
+import org.kie.kogito.jobs.service.utils.DateUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.quarkus.panache.common.Parameters;
+import io.quarkus.panache.common.Sort;
+import io.smallrye.mutiny.Multi;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import static java.time.OffsetDateTime.now;
+import static mutiny.zero.flow.adapters.AdaptersToReactiveStreams.publisher;
+import static org.kie.kogito.jobs.service.utils.DateUtil.DEFAULT_ZONE;
+
+@ApplicationScoped
+public class JPAReactiveJobRepository extends BaseReactiveJobRepository
implements ReactiveJobRepository {
+
+ private static final String JOBS_BETWEEN_FIRE_TIMES_QUERY = "select job " +
+ "from JobDetailsEntity job " +
+ "where job.fireTime between :from and :to and job.status in
:status";
+
+ private final JobDetailsEntityRepository repository;
+ private final ReactiveRepositoryHelper reactiveRepositoryHelper;
+
+ private final TriggerMarshaller triggerMarshaller;
+ private final RecipientMarshaller recipientMarshaller;
+
+ JPAReactiveJobRepository() {
+ this(null, null, null, null, null, null);
+ }
+
+ @Inject
+ public JPAReactiveJobRepository(Vertx vertx, JobEventPublisher
jobEventPublisher, JobDetailsEntityRepository repository,
+ ReactiveRepositoryHelper reactiveRepositoryHelper,
+ TriggerMarshaller triggerMarshaller, RecipientMarshaller
recipientMarshaller) {
+ super(vertx, jobEventPublisher);
+ this.repository = repository;
+ this.reactiveRepositoryHelper = reactiveRepositoryHelper;
+ this.triggerMarshaller = triggerMarshaller;
+ this.recipientMarshaller = recipientMarshaller;
+ }
+
+ @Override
+ public CompletionStage<JobDetails> doSave(JobDetails job) {
+ return this.reactiveRepositoryHelper.runAsync(() -> persist(job))
+ .thenApply(this::from);
+ }
+
+ private JobDetailsEntity persist(JobDetails job) {
+ JobDetailsEntity jobDetailsInstance =
repository.findByIdOptional(job.getId()).orElseGet(JobDetailsEntity::new);
+
+ merge(job, jobDetailsInstance);
+
+ repository.persist(jobDetailsInstance);
+
+ return repository.findById(job.getId());
+ }
+
+ @Override
+ public CompletionStage<JobDetails> get(String id) {
+ return this.reactiveRepositoryHelper.runAsync(() ->
repository.findById(id))
+ .thenApply(this::from);
+ }
+
+ @Override
+ public CompletionStage<Boolean> exists(String id) {
+ return this.reactiveRepositoryHelper.runAsync(() ->
repository.findByIdOptional(id))
+ .thenApply(Optional::isPresent);
+ }
+
+ @Override
+ public CompletionStage<JobDetails> delete(String id) {
+ return this.reactiveRepositoryHelper.runAsync(() -> this.deleteJob(id))
+ .thenApply(this::from);
+
+ }
+
+ private JobDetailsEntity deleteJob(String id) {
+ JobDetailsEntity jobDetailsInstance = repository.findById(id);
+
+ if (Objects.isNull(jobDetailsInstance)) {
+ return null;
+ }
+
+ repository.delete(jobDetailsInstance);
+
+ return jobDetailsInstance;
+ }
+
+ String toColumName(SortTermField field) {
+ return switch (field) {
+ case FIRE_TIME -> "fireTime";
+ case CREATED -> "created";
+ case ID -> "id";
+ default -> throw new IllegalArgumentException("No colum name is
defined for field: " + field);
+ };
+ }
+
+ @Override
+ public PublisherBuilder<JobDetails> findByStatusBetweenDates(ZonedDateTime
fromFireTime,
+ ZonedDateTime toFireTime,
+ JobStatus[] status,
+ SortTerm[] orderBy) {
+
+ Parameters params = Parameters.with("from",
fromFireTime.toOffsetDateTime())
+ .and("to", toFireTime.toOffsetDateTime())
+ .and("status",
Arrays.stream(status).map(Enum::toString).toList());
+
+ Sort sort = Sort.empty();
+
+ Arrays.stream(orderBy).forEach(sortTerm -> {
+ String columnName = toColumName(sortTerm.getField());
+ sort.and(columnName, sortTerm.isAsc() ? Sort.Direction.Ascending :
Sort.Direction.Descending);
+ });
+
+ return ReactiveStreams.fromPublisher(publisher(Multi.createFrom()
+ .completionStage(this.reactiveRepositoryHelper.runAsync(() ->
repository.list(JOBS_BETWEEN_FIRE_TIMES_QUERY, sort, params.map())))
+ .flatMap(jobDetailsEntities ->
Multi.createFrom().iterable(jobDetailsEntities))
+ .map(this::from)));
+
+ }
+
+ JobDetailsEntity merge(JobDetails job, JobDetailsEntity instance) {
+ if (Objects.isNull(instance)) {
+ instance = new JobDetailsEntity();
+ }
+
+ ObjectMapper mapper = ObjectMapperFactory.get();
+
+ OffsetDateTime lastUpdate = now().truncatedTo(ChronoUnit.MILLIS);
+
+ instance.setId(job.getId());
+ instance.setCorrelationId(job.getCorrelationId());
+ instance.setStatus(mapOptionalValue(job.getStatus(), Enum::name));
+ instance.setLastUpdate(lastUpdate);
+ instance.setRetries(job.getRetries());
+ instance.setExecutionCounter(job.getExecutionCounter());
+ instance.setScheduledId(job.getScheduledId());
+ instance.setPriority(job.getPriority());
+
+ instance.setRecipient(mapOptionalValue(job.getRecipient(), recipient
-> mapper.valueToTree(recipientMarshaller.marshall(recipient).getMap())));
+ instance.setTrigger(mapOptionalValue(job.getTrigger(), trigger ->
mapper.valueToTree(triggerMarshaller.marshall(job.getTrigger()).getMap())));
+
instance.setFireTime(mapOptionalValue(job.getTrigger().hasNextFireTime(),
DateUtil::dateToOffsetDateTime));
+
+ instance.setExecutionTimeout(job.getExecutionTimeout());
+
instance.setExecutionTimeoutUnit(mapOptionalValue(job.getExecutionTimeoutUnit(),
Enum::name));
+
+
instance.setCreated(Optional.ofNullable(job.getCreated()).map(ZonedDateTime::toOffsetDateTime).orElse(lastUpdate));
+
+ return instance;
+ }
+
+ JobDetails from(JobDetailsEntity instance) {
+ if (instance == null) {
+ return null;
+ }
+
+ return JobDetails.builder()
+ .id(instance.getId())
+ .correlationId(instance.getCorrelationId())
+ .status(mapOptionalValue(instance.getStatus(),
JobStatus::valueOf))
+
.lastUpdate(instance.getLastUpdate().atZoneSameInstant(DEFAULT_ZONE))
+ .retries(instance.getRetries())
+ .executionCounter(instance.getExecutionCounter())
+ .scheduledId(instance.getScheduledId())
+ .priority(instance.getPriority())
+ .recipient(mapOptionalValue(instance.getRecipient(), recipient
-> recipientMarshaller.unmarshall(JsonObject.mapFrom(recipient))))
+ .trigger(mapOptionalValue(instance.getTrigger(), trigger ->
triggerMarshaller.unmarshall(JsonObject.mapFrom(trigger))))
+ .executionTimeout(instance.getExecutionTimeout())
+
.executionTimeoutUnit(mapOptionalValue(instance.getExecutionTimeoutUnit(),
ChronoUnit::valueOf))
+ .created(instance.getCreated().atZoneSameInstant(DEFAULT_ZONE))
+ .build();
+ }
+
+ private <T, R> R mapOptionalValue(T object, Function<T, R> mapper) {
+ return Optional.ofNullable(object)
+ .map(mapper)
+ .orElse(null);
+ }
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java
new file mode 100644
index 000000000..0634dc844
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
+import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
+import
org.kie.kogito.jobs.service.repository.jpa.model.JobServiceManagementEntity;
+import
org.kie.kogito.jobs.service.repository.jpa.repository.JobServiceManagementEntityRepository;
+import
org.kie.kogito.jobs.service.repository.jpa.utils.ReactiveRepositoryHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.quarkus.panache.common.Parameters;
+import io.smallrye.mutiny.Uni;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import static java.time.OffsetDateTime.now;
+
+@ApplicationScoped
+public class JPAReactiveJobServiceManagementRepository implements
JobServiceManagementRepository {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JPAReactiveJobServiceManagementRepository.class);
+
+ private final JobServiceManagementEntityRepository repository;
+ private final ReactiveRepositoryHelper reactiveRepositoryHelper;
+
+ @Inject
+ public
JPAReactiveJobServiceManagementRepository(JobServiceManagementEntityRepository
repository,
+ ReactiveRepositoryHelper reactiveRepositoryHelper) {
+ this.repository = repository;
+ this.reactiveRepositoryHelper = reactiveRepositoryHelper;
+ }
+
+ @Override
+ public Uni<JobServiceManagementInfo> getAndUpdate(String id,
Function<JobServiceManagementInfo, JobServiceManagementInfo> computeUpdate) {
+ LOGGER.info("get {}", id);
+ return Uni.createFrom()
+ .completionStage(this.reactiveRepositoryHelper.runAsync(() ->
doGetAndUpdate(id, computeUpdate)))
+ .onItem().ifNotNull().invoke(info -> LOGGER.trace("got {}",
info));
+ }
+
+ private JobServiceManagementInfo doGetAndUpdate(String id,
Function<JobServiceManagementInfo, JobServiceManagementInfo> computeUpdate) {
+
+ JobServiceManagementInfo info = this.repository.findByIdOptional(id)
+ .map(this::from)
+ .orElse(null);
+
+ return this.update(computeUpdate.apply(info));
+ }
+
+ @Override
+ public Uni<JobServiceManagementInfo> set(JobServiceManagementInfo info) {
+ LOGGER.info("set {}", info);
+ return
Uni.createFrom().completionStage(this.reactiveRepositoryHelper.runAsync(() ->
this.doSet(info)));
+ }
+
+ public JobServiceManagementInfo doSet(JobServiceManagementInfo info) {
+ return this.update(info);
+ }
+
+ private JobServiceManagementInfo update(JobServiceManagementInfo info) {
+
+ if (Objects.isNull(info)) {
+ return null;
+ }
+
+ JobServiceManagementEntity jobService =
this.repository.findByIdOptional(info.getId()).orElse(new
JobServiceManagementEntity());
+
+ jobService.setId(info.getId());
+ jobService.setToken(info.getToken());
+ jobService.setLastHeartBeat(info.getLastHeartbeat());
+
+ repository.persist(jobService);
+
+ return from(jobService);
+ }
+
+ @Override
+ public Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo
info) {
+ return
Uni.createFrom().completionStage(this.reactiveRepositoryHelper.runAsync(() ->
this.doHeartbeat(info)));
+ }
+
+ private JobServiceManagementEntity findById(String id) {
+ return repository.findById(id);
+ }
+
+ private JobServiceManagementEntity
findByIdAndToken(JobServiceManagementInfo info) {
+ return
repository.find("#JobServiceManagementEntity.GetServiceByIdAndToken",
Parameters.with("id", info.getId()).and("token", info.getToken()).map())
+ .firstResultOptional().orElse(null);
+ }
+
+ private JobServiceManagementInfo doHeartbeat(JobServiceManagementInfo
info) {
+ JobServiceManagementEntity jobService = findByIdAndToken(info);
+
+ if (jobService == null) {
+ return null;
+ }
+
+ jobService.setLastHeartBeat(now());
+ repository.persist(jobService);
+
+ return from(jobService);
+ }
+
+ JobServiceManagementInfo from(JobServiceManagementEntity jobService) {
+ if (Objects.isNull(jobService)) {
+ return null;
+ }
+ return new JobServiceManagementInfo(jobService.getId(),
jobService.getToken(), jobService.getLastHeartBeat());
+ }
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java
new file mode 100644
index 000000000..b269ab9ea
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa.converter;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import jakarta.persistence.AttributeConverter;
+
+public class JsonBinaryConverter implements AttributeConverter<ObjectNode,
byte[]> {
+
+ @Override
+ public byte[] convertToDatabaseColumn(ObjectNode attribute) {
+ try {
+ return attribute == null ? null :
ObjectMapperFactory.get().writeValueAsBytes(attribute);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public ObjectNode convertToEntityAttribute(byte[] dbData) {
+ try {
+ return dbData == null ? null :
ObjectMapperFactory.get().readValue(dbData, ObjectNode.class);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java
new file mode 100644
index 000000000..9c1b317b4
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa.model;
+
+import java.time.OffsetDateTime;
+
+import
org.kie.kogito.jobs.service.repository.jpa.converter.JsonBinaryConverter;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import jakarta.persistence.*;
+
+@Entity
+@Table(name = "job_details",
+ indexes = {
+ @Index(name = "job_details_fire_time_idx", columnList =
"fire_time"),
+ @Index(name = "job_details_created_idx", columnList =
"created")
+ })
+public class JobDetailsEntity {
+
+ @Id
+ private String id;
+
+ @Column(name = "correlation_id")
+ private String correlationId;
+
+ private String status;
+
+ @Column(name = "last_update")
+ @Temporal(TemporalType.TIMESTAMP)
+ private OffsetDateTime lastUpdate;
+
+ private Integer retries;
+
+ @Column(name = "execution_counter")
+ private Integer executionCounter;
+
+ @Column(name = "scheduled_id")
+ private String scheduledId;
+
+ private Integer priority;
+
+ @Convert(converter = JsonBinaryConverter.class)
+ private ObjectNode recipient;
+
+ @Convert(converter = JsonBinaryConverter.class)
+ private ObjectNode trigger;
+
+ @Column(name = "fire_time")
+ @Temporal(TemporalType.TIMESTAMP)
+ private OffsetDateTime fireTime;
+
+ @Column(name = "execution_timeout")
+ private Long executionTimeout;
+ @Column(name = "execution_timeout_unit")
+ private String executionTimeoutUnit;
+
+ @Temporal(TemporalType.TIMESTAMP)
+ private OffsetDateTime created;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ public void setCorrelationId(String correlationId) {
+ this.correlationId = correlationId;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public OffsetDateTime getLastUpdate() {
+ return lastUpdate;
+ }
+
+ public void setLastUpdate(OffsetDateTime lastUpdate) {
+ this.lastUpdate = lastUpdate;
+ }
+
+ public Integer getRetries() {
+ return retries;
+ }
+
+ public void setRetries(Integer retries) {
+ this.retries = retries;
+ }
+
+ public Integer getExecutionCounter() {
+ return executionCounter;
+ }
+
+ public void setExecutionCounter(Integer executionCounter) {
+ this.executionCounter = executionCounter;
+ }
+
+ public String getScheduledId() {
+ return scheduledId;
+ }
+
+ public void setScheduledId(String scheduledId) {
+ this.scheduledId = scheduledId;
+ }
+
+ public Integer getPriority() {
+ return priority;
+ }
+
+ public void setPriority(Integer priority) {
+ this.priority = priority;
+ }
+
+ public ObjectNode getRecipient() {
+ return recipient;
+ }
+
+ public void setRecipient(ObjectNode recipient) {
+ this.recipient = recipient;
+ }
+
+ public ObjectNode getTrigger() {
+ return trigger;
+ }
+
+ public void setTrigger(ObjectNode trigger) {
+ this.trigger = trigger;
+ }
+
+ public OffsetDateTime getFireTime() {
+ return fireTime;
+ }
+
+ public void setFireTime(OffsetDateTime fireTime) {
+ this.fireTime = fireTime;
+ }
+
+ public Long getExecutionTimeout() {
+ return executionTimeout;
+ }
+
+ public void setExecutionTimeout(Long executionTimeout) {
+ this.executionTimeout = executionTimeout;
+ }
+
+ public String getExecutionTimeoutUnit() {
+ return executionTimeoutUnit;
+ }
+
+ public void setExecutionTimeoutUnit(String executionTimeoutUnit) {
+ this.executionTimeoutUnit = executionTimeoutUnit;
+ }
+
+ public OffsetDateTime getCreated() {
+ return created;
+ }
+
+ public void setCreated(OffsetDateTime created) {
+ this.created = created;
+ }
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java
new file mode 100644
index 000000000..77ac0a67a
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.model;
+
+import java.time.OffsetDateTime;
+
+import jakarta.persistence.*;
+
+@Entity
+@NamedQuery(name = "JobServiceManagementEntity.GetServiceByIdAndToken",
+ query = "select service " +
+ "from JobServiceManagementEntity service " +
+ "where service.id = :id and service.token = :token")
+@Table(name = "job_service_management")
+public class JobServiceManagementEntity {
+
+ @Id
+ private String id;
+
+ @Column(name = "last_heartbeat")
+ @Temporal(TemporalType.TIMESTAMP)
+ private OffsetDateTime lastHeartBeat;
+
+ private String token;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public OffsetDateTime getLastHeartBeat() {
+ return lastHeartBeat;
+ }
+
+ public void setLastHeartBeat(OffsetDateTime lastHeartBeat) {
+ this.lastHeartBeat = lastHeartBeat;
+ }
+
+ public String getToken() {
+ return token;
+ }
+
+ public void setToken(String token) {
+ this.token = token;
+ }
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java
new file mode 100644
index 000000000..dd1003f63
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.repository;
+
+import org.kie.kogito.jobs.service.repository.jpa.model.JobDetailsEntity;
+
+import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class JobDetailsEntityRepository implements
PanacheRepositoryBase<JobDetailsEntity, String> {
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java
new file mode 100644
index 000000000..1a528ccb2
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.repository;
+
+import
org.kie.kogito.jobs.service.repository.jpa.model.JobServiceManagementEntity;
+
+import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class JobServiceManagementEntityRepository implements
PanacheRepositoryBase<JobServiceManagementEntity, String> {
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java
new file mode 100644
index 000000000..9dab31a5b
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.utils;
+
+import java.util.concurrent.CompletionStage;
+import java.util.function.Supplier;
+
+import io.quarkus.narayana.jta.QuarkusTransaction;
+import io.quarkus.narayana.jta.TransactionRunnerOptions;
+import io.vertx.core.Vertx;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+@ApplicationScoped
+public class ReactiveRepositoryHelper {
+
+ private Vertx vertx;
+
+ @Inject
+ public ReactiveRepositoryHelper(Vertx vertx) {
+ this.vertx = vertx;
+ }
+
+ public <T> CompletionStage<T> runAsync(Supplier<T> blockingFunction) {
+ return vertx.executeBlocking(() ->
wrapInTransaction(blockingFunction)).toCompletionStage();
+ }
+
+ private <T> T wrapInTransaction(Supplier<T> function) {
+ TransactionRunnerOptions runner = QuarkusTransaction.isActive() ?
QuarkusTransaction.joiningExisting() : QuarkusTransaction.requiringNew();
+
+ return runner.call(function::get);
+ }
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml
b/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a0eb9fbf8
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,20 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
diff --git
a/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql
b/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql
new file mode 100644
index 000000000..b48a32a95
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+create table job_details
+(
+ id varchar(50) primary key,
+ correlation_id varchar(50),
+ status varchar(40),
+ last_update timestamp,
+ retries integer,
+ execution_counter integer,
+ scheduled_id varchar(40),
+ priority integer,
+ recipient varbinary(max),
+ trigger varbinary(max),
+ fire_time timestamp,
+ execution_timeout bigint,
+ execution_timeout_unit varchar(40),
+ created timestamp
+);
+
+create index job_details_fire_time_idx on job_details (fire_time);
+create index job_details_created_idx on job_details (created);
+
+CREATE TABLE job_service_management
+(
+ id varchar(40) primary key,
+ last_heartbeat timestamp,
+ token varchar(40) unique
+);
\ No newline at end of file
diff --git
a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java
new file mode 100644
index 000000000..d1fe3cafe
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.impl.BaseJobRepositoryTest;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+import jakarta.inject.Inject;
+
+@QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+public class JPAReactiveJobRepositoryTest extends BaseJobRepositoryTest {
+
+ @Inject
+ JPAReactiveJobRepository tested;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+
+ super.setUp();
+ }
+
+ @Override
+ public ReactiveJobRepository tested() {
+ return tested;
+ }
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java
new file mode 100644
index 000000000..a0a18cab4
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import java.time.OffsetDateTime;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
+import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
+import org.kie.kogito.jobs.service.utils.DateUtil;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+import jakarta.inject.Inject;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+class JPAReactiveJobServiceManagementRepositoryTest {
+
+ @Inject
+ JobServiceManagementRepository tested;
+
+ @BeforeEach
+ void setUp() {
+ }
+
+ @Test
+ void testGetAndUpdate() {
+ String id = "instance-id-1";
+ String token = "token1";
+ create(id, token);
+
+ AtomicReference<OffsetDateTime> date = new AtomicReference<>();
+ JobServiceManagementInfo updated = tested.getAndUpdate(id, info -> {
+ date.set(DateUtil.now().toOffsetDateTime());
+ info.setLastHeartbeat(date.get());
+ return info;
+ }).await().indefinitely();
+ assertThat(updated.getId()).isEqualTo(id);
+ assertThat(date.get()).isNotNull();
+ assertThat(updated.getLastHeartbeat()).isEqualTo(date.get());
+ assertThat(updated.getToken()).isEqualTo(token);
+ }
+
+ @Test
+ void testGetAndUpdateNotExisting() {
+ String id = "instance-id-2";
+ AtomicReference<JobServiceManagementInfo> found = new
AtomicReference<>(new JobServiceManagementInfo());
+ JobServiceManagementInfo updated = tested.getAndUpdate(id, info -> {
+ found.set(info);
+ return info;
+ }).await().indefinitely();
+ assertThat(updated).isNull();
+ assertThat(found.get()).isNull();
+ }
+
+ private JobServiceManagementInfo create(String id, String token) {
+ JobServiceManagementInfo created = tested.set(new
JobServiceManagementInfo(id, token, null)).await().indefinitely();
+ assertThat(created.getId()).isEqualTo(id);
+ assertThat(created.getToken()).isEqualTo(token);
+ assertThat(created.getLastHeartbeat()).isNull();
+ return created;
+ }
+
+ @Test
+ void testHeartbeat() {
+ String id = "instance-id-3";
+ String token = "token3";
+ JobServiceManagementInfo created = create(id, token);
+
+ JobServiceManagementInfo updated =
tested.heartbeat(created).await().indefinitely();
+ assertThat(updated.getLastHeartbeat()).isNotNull();
+
assertThat(updated.getLastHeartbeat()).isBefore(DateUtil.now().plusSeconds(1).toOffsetDateTime());
+ }
+
+ @Test
+ void testConflictHeartbeat() {
+ String id = "instance-id-4";
+ String token = "token4";
+ create(id, token);
+
+ JobServiceManagementInfo updated = tested.heartbeat(new
JobServiceManagementInfo(id, "differentToken", null)).await().indefinitely();
+ assertThat(updated).isNull();
+ }
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java
new file mode 100644
index 000000000..0fdc67d71
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.resource;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+public class JPAJobResourceTest extends BaseJobResourceTest {
+
+}
diff --git
a/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties
b/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties
new file mode 100644
index 000000000..ef10d9621
--- /dev/null
+++
b/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Kogito
+kogito.apps.persistence.type=jdbc
+# Data source
+quarkus.datasource.db-kind=h2
+quarkus.datasource.username=kogito
+quarkus.datasource.jdbc.url=jdbc:h2:mem:default;NON_KEYWORDS=VALUE,KEY
+quarkus.flyway.migrate-at-start=true
+quarkus.flyway.clean-at-start=true
+quarkus.flyway.locations=db/jobs-service
+# Disabling Security for tests
+quarkus.oidc.enabled=false
+quarkus.oidc.tenant-enabled=false
+quarkus.oidc.auth-server-url=none
+quarkus.keycloak.devservices.enabled=false
\ No newline at end of file
diff --git a/jobs-service/pom.xml b/jobs-service/pom.xml
index df5f21f77..8eb5f0ff2 100644
--- a/jobs-service/pom.xml
+++ b/jobs-service/pom.xml
@@ -39,6 +39,7 @@
<module>jobs-recipients</module>
<module>jobs-service-common</module>
<module>jobs-service-postgresql-common</module>
+ <module>jobs-service-storage-jpa</module>
<module>jobs-service-postgresql</module>
<module>jobs-service-inmemory</module>
<module>kogito-addons-jobs-service</module>
diff --git a/kogito-apps-bom/pom.xml b/kogito-apps-bom/pom.xml
index 5e903d497..774cd4696 100644
--- a/kogito-apps-bom/pom.xml
+++ b/kogito-apps-bom/pom.xml
@@ -78,6 +78,17 @@
<version>${project.version}</version>
<classifier>sources</classifier>
</dependency>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>jobs-service-storage-jpa</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>jobs-service-storage-jpa</artifactId>
+ <version>${project.version}</version>
+ <classifier>sources</classifier>
+ </dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>jobs-service-postgresql</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]