This is an automated email from the ASF dual-hosted git repository.

wmedvedeo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new a82962149 kie-kogito-apps-2149: Add the ability to send the 
jobs-service job status change events by using partitioning (#2150)
a82962149 is described below

commit a82962149d0cc1df188a09e084d4804a8767f9f8
Author: Walter Medvedeo <[email protected]>
AuthorDate: Tue Nov 26 19:39:45 2024 +0100

    kie-kogito-apps-2149: Add the ability to send the jobs-service job status 
change events by using partitioning (#2150)
---
 .../kogito/jobs/service/stream/AbstractJobStreams.java | 18 +++++++++++-------
 .../jobs/service/stream/AbstractJobStreamsTest.java    |  4 ++--
 .../service/messaging/http/stream/HttpJobStreams.java  | 13 ++++++++++++-
 .../messaging/http/stream/HttpJobStreamsTest.java      | 10 ++++++++++
 .../messaging/kafka/stream/KafkaJobStreams.java        | 10 ++++++++++
 .../messaging/kafka/stream/KafkaJobStreamsTest.java    | 12 ++++++++++++
 6 files changed, 57 insertions(+), 10 deletions(-)

diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java
index 64189e2ab..d09d3bbc5 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java
@@ -65,16 +65,12 @@ public abstract class AbstractJobStreams implements 
JobStreams {
     public void jobStatusChange(JobDetails job) {
         if (isEnabled()) {
             try {
-                JobDataEvent event = JobDataEvent
-                        .builder()
-                        .source(url + RestApiConstants.JOBS_PATH)
-                        .data(ScheduledJobAdapter.of(job))//this should 
support jobs crated with V1 and V2
-                        .build();
+                JobDataEvent event = buildEvent(job);
                 LOGGER.debug("emit jobStatusChange, hasRequests: {}, eventId: 
{}, jobDetails: {}", emitter.hasRequests(), event.getId(), job);
                 String json = objectMapper.writeValueAsString(event);
                 emitter.send(decorate(ContextAwareMessage.of(json)
                         .withAck(() -> onAck(event.getId(), job))
-                        .withNack(reason -> onNack(reason, job))));
+                        .withNack(reason -> onNack(reason, job)), event));
             } catch (Exception e) {
                 String msg = String.format("An unexpected error was produced 
while processing a Job status change for the job: %s", job);
                 LOGGER.error(msg, e);
@@ -82,6 +78,14 @@ public abstract class AbstractJobStreams implements 
JobStreams {
         }
     }
 
+    protected JobDataEvent buildEvent(JobDetails job) {
+        return JobDataEvent
+                .builder()
+                .source(url + RestApiConstants.JOBS_PATH)
+                .data(ScheduledJobAdapter.of(job))//this should support jobs 
crated with V1 and V2
+                .build();
+    }
+
     protected CompletionStage<Void> onAck(String eventId, JobDetails job) {
         LOGGER.debug("Job Status change emitted successfully, eventId: {}, 
jobDetails: {}", eventId, job);
         return CompletableFuture.completedFuture(null);
@@ -93,7 +97,7 @@ public abstract class AbstractJobStreams implements 
JobStreams {
         return CompletableFuture.completedFuture(null);
     }
 
-    protected Message<String> decorate(Message<String> message) {
+    protected Message<String> decorate(Message<String> message, JobDataEvent 
event) {
         return message;
     }
 }
diff --git 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java
 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java
index f7ac6ed1c..1f2e3fc33 100644
--- 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java
+++ 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java
@@ -58,7 +58,7 @@ public abstract class AbstractJobStreamsTest<T extends 
AbstractJobStreams> {
     protected static final String URL = "http://localhost:8180";;
     private static final String SERIALIZED_MESSAGE = "SERIALIZED_MESSAGE";
 
-    private static final String JOB_ID = "JOB_ID";
+    protected static final String JOB_ID = "JOB_ID";
     private static final String CORRELATION_ID = "CORRELATION_ID";
     private static final JobStatus STATUS = JobStatus.SCHEDULED;
     private static final ZonedDateTime LAST_UPDATE = 
ZonedDateTime.parse("2022-08-03T18:00:15.001+01:00");
@@ -170,7 +170,7 @@ public abstract class AbstractJobStreamsTest<T extends 
AbstractJobStreams> {
 
     }
 
-    private void assertExpectedEvent(JobDataEvent event) {
+    protected void assertExpectedEvent(JobDataEvent event) {
         assertThat(event.getId()).isNotNull();
         assertThat(event.getType()).isEqualTo(JobDataEvent.JOB_EVENT_TYPE);
         assertThat(event.getSource()).hasToString(URL + "/jobs");
diff --git 
a/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java
 
b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java
index 52bb50d35..61a2efeba 100644
--- 
a/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java
+++ 
b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java
@@ -26,6 +26,7 @@ import org.eclipse.microprofile.reactive.messaging.Channel;
 import org.eclipse.microprofile.reactive.messaging.Emitter;
 import org.eclipse.microprofile.reactive.messaging.Message;
 import org.eclipse.microprofile.reactive.messaging.OnOverflow;
+import org.kie.kogito.jobs.service.events.JobDataEvent;
 import org.kie.kogito.jobs.service.model.JobDetails;
 import org.kie.kogito.jobs.service.stream.AbstractJobStreams;
 import org.slf4j.Logger;
@@ -45,6 +46,7 @@ public class HttpJobStreams extends AbstractJobStreams {
 
     public static final String PUBLISH_EVENTS_CONFIG_KEY = 
"kogito.jobs-service.http.job-status-change-events";
     public static final String JOB_STATUS_CHANGE_EVENTS_HTTP = 
"kogito-job-service-job-status-events-http";
+    public static final String PARTITION_KEY_EXTENSION = "partitionkey";
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HttpJobStreams.class);
 
@@ -70,7 +72,16 @@ public class HttpJobStreams extends AbstractJobStreams {
     }
 
     @Override
-    protected Message<String> decorate(Message<String> message) {
+    protected JobDataEvent buildEvent(JobDetails job) {
+        JobDataEvent event = super.buildEvent(job);
+        // use the well-known extension 
https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/partitioning.md
+        // to instruct potential http driven Brokers like, Knative Eventing 
Kafka Broker, to process accordingly.
+        event.addExtensionAttribute(PARTITION_KEY_EXTENSION, 
event.getData().getId());
+        return event;
+    }
+
+    @Override
+    protected Message<String> decorate(Message<String> message, JobDataEvent 
event) {
         return message.addMetadata(OUTGOING_HTTP_METADATA.get());
     }
 }
diff --git 
a/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java
 
b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java
index 8449d88da..7b951b5e3 100644
--- 
a/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java
+++ 
b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java
@@ -21,6 +21,7 @@ package org.kie.kogito.jobs.service.messaging.http.stream;
 import java.util.Optional;
 
 import org.eclipse.microprofile.reactive.messaging.Message;
+import org.kie.kogito.jobs.service.events.JobDataEvent;
 import org.kie.kogito.jobs.service.stream.AbstractJobStreamsTest;
 
 import io.cloudevents.jackson.JsonFormat;
@@ -29,6 +30,7 @@ import 
io.quarkus.reactivemessaging.http.runtime.OutgoingHttpMetadata;
 import jakarta.ws.rs.core.HttpHeaders;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.kie.kogito.jobs.service.messaging.http.stream.HttpJobStreams.PARTITION_KEY_EXTENSION;
 
 class HttpJobStreamsTest extends AbstractJobStreamsTest<HttpJobStreams> {
 
@@ -44,4 +46,12 @@ class HttpJobStreamsTest extends 
AbstractJobStreamsTest<HttpJobStreams> {
         assertThat(metadata.getHeaders()).hasSize(1);
         
assertThat(metadata.getHeaders().get(HttpHeaders.CONTENT_TYPE)).containsExactlyInAnyOrder(JsonFormat.CONTENT_TYPE);
     }
+
+    @Override
+    protected void assertExpectedEvent(JobDataEvent event) {
+        super.assertExpectedEvent(event);
+        assertThat(event.getExtension(PARTITION_KEY_EXTENSION))
+                .isNotNull()
+                .isEqualTo(JOB_ID);
+    }
 }
diff --git 
a/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java
 
b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java
index c9b149ede..315d89dff 100644
--- 
a/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java
+++ 
b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java
@@ -23,7 +23,9 @@ import java.util.Optional;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 import org.eclipse.microprofile.reactive.messaging.Channel;
 import org.eclipse.microprofile.reactive.messaging.Emitter;
+import org.eclipse.microprofile.reactive.messaging.Message;
 import org.eclipse.microprofile.reactive.messaging.OnOverflow;
+import org.kie.kogito.jobs.service.events.JobDataEvent;
 import org.kie.kogito.jobs.service.model.JobDetails;
 import org.kie.kogito.jobs.service.stream.AbstractJobStreams;
 import org.kie.kogito.jobs.service.stream.AvailableStreams;
@@ -32,6 +34,8 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
+
 import jakarta.enterprise.context.ApplicationScoped;
 import jakarta.inject.Inject;
 
@@ -54,4 +58,10 @@ public class KafkaJobStreams extends AbstractJobStreams {
         LOGGER.debug("jobStatusChange call received, enabled: {}, job: {}", 
enabled, job);
         super.jobStatusChange(job);
     }
+
+    @Override
+    protected Message<String> decorate(Message<String> message, JobDataEvent 
event) {
+        // regular kafka partitioning.
+        return 
message.addMetadata(OutgoingKafkaRecordMetadata.builder().withKey(event.getData().getId()).build());
+    }
 }
diff --git 
a/jobs-service/jobs-service-messaging-kafka/src/test/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreamsTest.java
 
b/jobs-service/jobs-service-messaging-kafka/src/test/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreamsTest.java
index a0dbc6c95..d5be8d1f5 100644
--- 
a/jobs-service/jobs-service-messaging-kafka/src/test/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreamsTest.java
+++ 
b/jobs-service/jobs-service-messaging-kafka/src/test/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreamsTest.java
@@ -20,12 +20,24 @@ package org.kie.kogito.jobs.service.messaging.kafka.stream;
 
 import java.util.Optional;
 
+import org.eclipse.microprofile.reactive.messaging.Message;
 import org.kie.kogito.jobs.service.stream.AbstractJobStreamsTest;
 
+import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
 class KafkaJobStreamsTest extends AbstractJobStreamsTest<KafkaJobStreams> {
 
     @Override
     protected KafkaJobStreams createJobStreams() {
         return new KafkaJobStreams(objectMapper, Optional.of(true), emitter, 
URL);
     }
+
+    @Override
+    protected void assertExpectedMetadata(Message<String> message) {
+        OutgoingKafkaRecordMetadata<?> metadata = 
message.getMetadata(OutgoingKafkaRecordMetadata.class).orElse(null);
+        assertThat(metadata).isNotNull();
+        assertThat(metadata.getKey()).isEqualTo(JOB_ID);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to