This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new 99c7f5d6ac [Fix_3383] Adding binary cloud event support for outgoing 
messages (#3386)
99c7f5d6ac is described below

commit 99c7f5d6ac500d08a645f6c35ed28c099aa816c0
Author: Francisco Javier Tirado Sarti 
<[email protected]>
AuthorDate: Fri Feb 2 14:26:21 2024 +0100

    [Fix_3383] Adding binary cloud event support for outgoing messages (#3386)
    
    * [Fix_3383] Do not include null values on json payload
    
    * Revert "[Fix_3383] Do not include null values on json payload"
    
    This reverts commit 47b9b4d7599badd9f8bdabbec22ddf10ab7b82c7.
    
    * [Fix_#3383] Setting metadata when using binary
---
 .../common/AbstractQuarkusCloudEventEmitter.java   | 30 ++++++++++++++++++----
 .../quarkus/deployment/ChannelInfo.java            | 13 ++++++++--
 .../quarkus/deployment/ChannelMappingStrategy.java | 25 +++++++++++++++++-
 ...ntEmitterGenerator.java => CloudEventMode.java} | 17 +++---------
 .../quarkus/deployment/EventEmitterGenerator.java  |  2 +-
 5 files changed, 64 insertions(+), 23 deletions(-)

diff --git 
a/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java
 
b/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java
index 2b87934702..ceed88f478 100644
--- 
a/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java
+++ 
b/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java
@@ -20,10 +20,12 @@ package org.kie.kogito.addon.quarkus.messaging.common;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 
 import org.eclipse.microprofile.reactive.messaging.Message;
+import org.eclipse.microprofile.reactive.messaging.Metadata;
 import 
org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
 import org.kie.kogito.event.CloudEventMarshaller;
 import org.kie.kogito.event.DataEvent;
@@ -32,6 +34,9 @@ import org.kie.kogito.event.EventMarshaller;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
+import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadataBuilder;
+
 import jakarta.inject.Inject;
 
 public abstract class AbstractQuarkusCloudEventEmitter<M> implements 
EventEmitter {
@@ -49,11 +54,11 @@ public abstract class AbstractQuarkusCloudEventEmitter<M> 
implements EventEmitte
     public CompletionStage<Void> emit(DataEvent<?> dataEvent) {
         logger.debug("publishing event {}", dataEvent);
         try {
-            Message<M> message = 
messageDecorator.decorate(Message.of(getPayload(dataEvent))
+            Message<M> message = 
messageDecorator.decorate(getMessage(dataEvent))
                     .withNack(e -> {
                         logger.error("Error publishing event {}", dataEvent, 
e);
                         return CompletableFuture.completedFuture(null);
-                    }));
+                    });
             emit(message);
             return message.getAck().get();
         } catch (IOException e) {
@@ -69,11 +74,26 @@ public abstract class AbstractQuarkusCloudEventEmitter<M> 
implements EventEmitte
         this.cloudEventMarshaller = marshaller;
     }
 
-    private <T> M getPayload(DataEvent<T> event) throws IOException {
+    private <T> Optional<OutgoingCloudEventMetadata<?>> 
getMetadata(DataEvent<T> event) {
+        if (event.getId() == null || event.getType() == null || 
event.getSource() == null || event.getSpecVersion() == null) {
+            return Optional.empty();
+        }
+        OutgoingCloudEventMetadataBuilder<Object> builder = 
OutgoingCloudEventMetadata.builder().withId(event.getId()).withSource(event.getSource()).withType(event.getType())
+                .withSubject(event.getSubject())
+                
.withDataContentType(event.getDataContentType()).withDataSchema(event.getDataSchema()).withSpecVersion(event.getSpecVersion().name()).withTimestamp(event.getTime().toZonedDateTime());
+        for (String extName : event.getExtensionNames()) {
+            builder.withExtension(extName, event.getExtension(extName));
+        }
+        return Optional.of(builder.build());
+    }
+
+    private <T> Message<M> getMessage(DataEvent<T> event) throws IOException {
         if (cloudEventMarshaller != null) {
-            return 
cloudEventMarshaller.marshall(event.asCloudEvent(cloudEventMarshaller.cloudEventDataFactory()));
+            return 
Message.of(cloudEventMarshaller.marshall(event.asCloudEvent(cloudEventMarshaller.cloudEventDataFactory())));
         } else if (eventMarshaller != null) {
-            return eventMarshaller.marshall(event.getData());
+            Optional<OutgoingCloudEventMetadata<?>> metadata = 
getMetadata(event);
+            M payload = eventMarshaller.marshall(event.getData());
+            return metadata.isPresent() ? Message.of(payload, 
Metadata.of(metadata.orElseThrow())) : Message.of(payload);
         } else {
             throw new IllegalStateException("Not marshaller has been set for 
emitter " + this);
         }
diff --git 
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java
 
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java
index 5d528da314..4db58fae5c 100644
--- 
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java
+++ 
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java
@@ -30,10 +30,13 @@ public class ChannelInfo {
     private final boolean isInput;
     private final boolean isDefault;
 
+    private final Optional<CloudEventMode> cloudEventMode;
+
     private final Optional<String> marshaller;
     private final Optional<OnOverflowInfo> onOverflow;
 
-    protected ChannelInfo(String channelName, Collection<String> triggers, 
String className, boolean isInput, boolean isDefault, Optional<String> 
marshaller, Optional<OnOverflowInfo> onOverflow) {
+    protected ChannelInfo(String channelName, Collection<String> triggers, 
String className, boolean isInput, boolean isDefault, Optional<String> 
marshaller, Optional<OnOverflowInfo> onOverflow,
+            Optional<CloudEventMode> cloudEventMode) {
         this.className = className;
         this.channelName = channelName;
         this.isInput = isInput;
@@ -41,6 +44,7 @@ public class ChannelInfo {
         this.triggers = triggers;
         this.marshaller = marshaller;
         this.onOverflow = onOverflow;
+        this.cloudEventMode = cloudEventMode;
     }
 
     public Collection<String> getTriggers() {
@@ -93,9 +97,14 @@ public class ChannelInfo {
         return onOverflow;
     }
 
+    public Optional<CloudEventMode> getCloudEventMode() {
+        return cloudEventMode;
+    }
+
     @Override
     public String toString() {
         return "ChannelInfo [channelName=" + channelName + ", className=" + 
className + ", triggers=" + triggers
-                + ", isInput=" + isInput + ", isDefault=" + isDefault + ", 
marshaller=" + marshaller + "]";
+                + ", isInput=" + isInput + ", isDefault=" + isDefault + ", 
cloudEventMode=" + cloudEventMode
+                + ", marshaller=" + marshaller + ", onOverflow=" + onOverflow 
+ "]";
     }
 }
diff --git 
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java
 
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java
index c62f4acfe4..485ce4749e 100644
--- 
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java
+++ 
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java
@@ -47,6 +47,8 @@ public class ChannelMappingStrategy {
     private static final String INCOMING_DEFAULT_CHANNEL = 
KOGITO_INCOMING_PREFIX + "defaultName";
     private static final String OUTGOING_DEFAULT_CHANNEL = 
KOGITO_OUTGOING_PREFIX + "defaultName";
 
+    private static final String CLOUD_EVENT_MODE = KOGITO_OUTGOING_PREFIX + 
"cloudEventMode";
+
     private static final String MARSHALLER_PREFIX = KOGITO_MESSAGING_PREFIX + 
"marshaller.";
     private static final String UNMARSHALLLER_PREFIX = KOGITO_MESSAGING_PREFIX 
+ "unmarshaller.";
     private static final String KOGITO_EMITTER_PREFIX = 
KOGITO_MESSAGING_PREFIX + "emitter.";
@@ -96,7 +98,28 @@ public class ChannelMappingStrategy {
         return new ChannelInfo(name, triggers.getOrDefault(name, 
Collections.singleton(name)),
                 getClassName(config.getOptionalValue(getPropertyName(prefix, 
name, "value." + (isInput ? "deserializer" : "serializer")), String.class)), 
isInput,
                 name.equals(defaultChannelName), 
config.getOptionalValue((isInput ? UNMARSHALLLER_PREFIX : MARSHALLER_PREFIX) + 
name, String.class),
-                isInput ? Optional.empty() : onOverflowInfo(config, name));
+                isInput ? Optional.empty() : onOverflowInfo(config, name), 
cloudEventMode(config, name, property));
+    }
+
+    private static Optional<CloudEventMode> cloudEventMode(Config config, 
String name, String property) {
+        if (!config.getOptionalValue("kogito.messaging.as-cloudevents", 
Boolean.class).orElse(true)) {
+            return Optional.empty();
+        }
+        Optional<CloudEventMode> cloudEventMode = getCloudEventMode(config, 
CLOUD_EVENT_MODE + "." + name);
+        if (cloudEventMode.isPresent()) {
+            return cloudEventMode;
+        }
+        cloudEventMode = getCloudEventMode(config, CLOUD_EVENT_MODE);
+        if (cloudEventMode.isPresent()) {
+            return cloudEventMode;
+        }
+        // if no config, infer default from connector type
+        String connector = config.getValue(property, String.class);
+        return Optional.of(connector.equals("quarkus-http") ? 
CloudEventMode.BINARY : CloudEventMode.STRUCTURED);
+    }
+
+    private static Optional<CloudEventMode> getCloudEventMode(Config config, 
String propName) {
+        return config.getOptionalValue(propName, 
String.class).map(String::toUpperCase).map(CloudEventMode::valueOf);
     }
 
     private static Optional<OnOverflowInfo> onOverflowInfo(Config config, 
String name) {
diff --git 
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
 
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/CloudEventMode.java
similarity index 55%
copy from 
quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
copy to 
quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/CloudEventMode.java
index e2999c9bc4..bb53eaa773 100644
--- 
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
+++ 
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/CloudEventMode.java
@@ -18,18 +18,7 @@
  */
 package org.kie.kogito.addon.cloudevents.quarkus.deployment;
 
-import org.kie.kogito.codegen.api.context.KogitoBuildContext;
-import org.kie.kogito.event.CloudEventMarshaller;
-import org.kie.kogito.event.EventMarshaller;
-
-public class EventEmitterGenerator extends EventGenerator {
-
-    public EventEmitterGenerator(KogitoBuildContext context, ChannelInfo 
channelInfo) {
-        super(context, channelInfo, "EventEmitter");
-        if (context.getApplicationProperty("kogito.messaging.as-cloudevents", 
Boolean.class).orElse(true)) {
-            generateMarshallerField("marshaller", "setCloudEventMarshaller", 
CloudEventMarshaller.class);
-        } else {
-            generateMarshallerField("marshaller", "setEventDataMarshaller", 
EventMarshaller.class);
-        }
-    }
+public enum CloudEventMode {
+    STRUCTURED,
+    BINARY
 }
diff --git 
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
 
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
index e2999c9bc4..1dba1d9144 100644
--- 
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
+++ 
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
@@ -26,7 +26,7 @@ public class EventEmitterGenerator extends EventGenerator {
 
     public EventEmitterGenerator(KogitoBuildContext context, ChannelInfo 
channelInfo) {
         super(context, channelInfo, "EventEmitter");
-        if (context.getApplicationProperty("kogito.messaging.as-cloudevents", 
Boolean.class).orElse(true)) {
+        if (channelInfo.getCloudEventMode().filter(mode -> mode == 
CloudEventMode.STRUCTURED).isPresent()) {
             generateMarshallerField("marshaller", "setCloudEventMarshaller", 
CloudEventMarshaller.class);
         } else {
             generateMarshallerField("marshaller", "setEventDataMarshaller", 
EventMarshaller.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to