This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new 99c7f5d6ac [Fix_3383] Adding binary cloud event support for outgoing
messages (#3386)
99c7f5d6ac is described below
commit 99c7f5d6ac500d08a645f6c35ed28c099aa816c0
Author: Francisco Javier Tirado Sarti
<[email protected]>
AuthorDate: Fri Feb 2 14:26:21 2024 +0100
[Fix_3383] Adding binary cloud event support for outgoing messages (#3386)
* [Fix_3383] Do not include null values on json payload
* Revert "[Fix_3383] Do not include null values on json payload"
This reverts commit 47b9b4d7599badd9f8bdabbec22ddf10ab7b82c7.
* [Fix_#3383] Setting metadata when using binary
---
.../common/AbstractQuarkusCloudEventEmitter.java | 30 ++++++++++++++++++----
.../quarkus/deployment/ChannelInfo.java | 13 ++++++++--
.../quarkus/deployment/ChannelMappingStrategy.java | 25 +++++++++++++++++-
...ntEmitterGenerator.java => CloudEventMode.java} | 17 +++---------
.../quarkus/deployment/EventEmitterGenerator.java | 2 +-
5 files changed, 64 insertions(+), 23 deletions(-)
diff --git
a/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java
b/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java
index 2b87934702..ceed88f478 100644
---
a/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java
+++
b/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java
@@ -20,10 +20,12 @@ package org.kie.kogito.addon.quarkus.messaging.common;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Message;
+import org.eclipse.microprofile.reactive.messaging.Metadata;
import
org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.DataEvent;
@@ -32,6 +34,9 @@ import org.kie.kogito.event.EventMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
+import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadataBuilder;
+
import jakarta.inject.Inject;
public abstract class AbstractQuarkusCloudEventEmitter<M> implements
EventEmitter {
@@ -49,11 +54,11 @@ public abstract class AbstractQuarkusCloudEventEmitter<M>
implements EventEmitte
public CompletionStage<Void> emit(DataEvent<?> dataEvent) {
logger.debug("publishing event {}", dataEvent);
try {
- Message<M> message =
messageDecorator.decorate(Message.of(getPayload(dataEvent))
+ Message<M> message =
messageDecorator.decorate(getMessage(dataEvent))
.withNack(e -> {
logger.error("Error publishing event {}", dataEvent,
e);
return CompletableFuture.completedFuture(null);
- }));
+ });
emit(message);
return message.getAck().get();
} catch (IOException e) {
@@ -69,11 +74,26 @@ public abstract class AbstractQuarkusCloudEventEmitter<M>
implements EventEmitte
this.cloudEventMarshaller = marshaller;
}
- private <T> M getPayload(DataEvent<T> event) throws IOException {
+ private <T> Optional<OutgoingCloudEventMetadata<?>>
getMetadata(DataEvent<T> event) {
+ if (event.getId() == null || event.getType() == null ||
event.getSource() == null || event.getSpecVersion() == null) {
+ return Optional.empty();
+ }
+ OutgoingCloudEventMetadataBuilder<Object> builder =
OutgoingCloudEventMetadata.builder().withId(event.getId()).withSource(event.getSource()).withType(event.getType())
+ .withSubject(event.getSubject())
+
.withDataContentType(event.getDataContentType()).withDataSchema(event.getDataSchema()).withSpecVersion(event.getSpecVersion().name()).withTimestamp(event.getTime().toZonedDateTime());
+ for (String extName : event.getExtensionNames()) {
+ builder.withExtension(extName, event.getExtension(extName));
+ }
+ return Optional.of(builder.build());
+ }
+
+ private <T> Message<M> getMessage(DataEvent<T> event) throws IOException {
if (cloudEventMarshaller != null) {
- return
cloudEventMarshaller.marshall(event.asCloudEvent(cloudEventMarshaller.cloudEventDataFactory()));
+ return
Message.of(cloudEventMarshaller.marshall(event.asCloudEvent(cloudEventMarshaller.cloudEventDataFactory())));
} else if (eventMarshaller != null) {
- return eventMarshaller.marshall(event.getData());
+ Optional<OutgoingCloudEventMetadata<?>> metadata =
getMetadata(event);
+ M payload = eventMarshaller.marshall(event.getData());
+ return metadata.isPresent() ? Message.of(payload,
Metadata.of(metadata.orElseThrow())) : Message.of(payload);
} else {
throw new IllegalStateException("Not marshaller has been set for
emitter " + this);
}
diff --git
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java
index 5d528da314..4db58fae5c 100644
---
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java
+++
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java
@@ -30,10 +30,13 @@ public class ChannelInfo {
private final boolean isInput;
private final boolean isDefault;
+ private final Optional<CloudEventMode> cloudEventMode;
+
private final Optional<String> marshaller;
private final Optional<OnOverflowInfo> onOverflow;
- protected ChannelInfo(String channelName, Collection<String> triggers,
String className, boolean isInput, boolean isDefault, Optional<String>
marshaller, Optional<OnOverflowInfo> onOverflow) {
+ protected ChannelInfo(String channelName, Collection<String> triggers,
String className, boolean isInput, boolean isDefault, Optional<String>
marshaller, Optional<OnOverflowInfo> onOverflow,
+ Optional<CloudEventMode> cloudEventMode) {
this.className = className;
this.channelName = channelName;
this.isInput = isInput;
@@ -41,6 +44,7 @@ public class ChannelInfo {
this.triggers = triggers;
this.marshaller = marshaller;
this.onOverflow = onOverflow;
+ this.cloudEventMode = cloudEventMode;
}
public Collection<String> getTriggers() {
@@ -93,9 +97,14 @@ public class ChannelInfo {
return onOverflow;
}
+ public Optional<CloudEventMode> getCloudEventMode() {
+ return cloudEventMode;
+ }
+
@Override
public String toString() {
return "ChannelInfo [channelName=" + channelName + ", className=" +
className + ", triggers=" + triggers
- + ", isInput=" + isInput + ", isDefault=" + isDefault + ",
marshaller=" + marshaller + "]";
+ + ", isInput=" + isInput + ", isDefault=" + isDefault + ",
cloudEventMode=" + cloudEventMode
+ + ", marshaller=" + marshaller + ", onOverflow=" + onOverflow
+ "]";
}
}
diff --git
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java
index c62f4acfe4..485ce4749e 100644
---
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java
+++
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java
@@ -47,6 +47,8 @@ public class ChannelMappingStrategy {
private static final String INCOMING_DEFAULT_CHANNEL =
KOGITO_INCOMING_PREFIX + "defaultName";
private static final String OUTGOING_DEFAULT_CHANNEL =
KOGITO_OUTGOING_PREFIX + "defaultName";
+ private static final String CLOUD_EVENT_MODE = KOGITO_OUTGOING_PREFIX +
"cloudEventMode";
+
private static final String MARSHALLER_PREFIX = KOGITO_MESSAGING_PREFIX +
"marshaller.";
private static final String UNMARSHALLLER_PREFIX = KOGITO_MESSAGING_PREFIX
+ "unmarshaller.";
private static final String KOGITO_EMITTER_PREFIX =
KOGITO_MESSAGING_PREFIX + "emitter.";
@@ -96,7 +98,28 @@ public class ChannelMappingStrategy {
return new ChannelInfo(name, triggers.getOrDefault(name,
Collections.singleton(name)),
getClassName(config.getOptionalValue(getPropertyName(prefix,
name, "value." + (isInput ? "deserializer" : "serializer")), String.class)),
isInput,
name.equals(defaultChannelName),
config.getOptionalValue((isInput ? UNMARSHALLLER_PREFIX : MARSHALLER_PREFIX) +
name, String.class),
- isInput ? Optional.empty() : onOverflowInfo(config, name));
+ isInput ? Optional.empty() : onOverflowInfo(config, name),
cloudEventMode(config, name, property));
+ }
+
+ private static Optional<CloudEventMode> cloudEventMode(Config config,
String name, String property) {
+ if (!config.getOptionalValue("kogito.messaging.as-cloudevents",
Boolean.class).orElse(true)) {
+ return Optional.empty();
+ }
+ Optional<CloudEventMode> cloudEventMode = getCloudEventMode(config,
CLOUD_EVENT_MODE + "." + name);
+ if (cloudEventMode.isPresent()) {
+ return cloudEventMode;
+ }
+ cloudEventMode = getCloudEventMode(config, CLOUD_EVENT_MODE);
+ if (cloudEventMode.isPresent()) {
+ return cloudEventMode;
+ }
+ // if no config, infer default from connector type
+ String connector = config.getValue(property, String.class);
+ return Optional.of(connector.equals("quarkus-http") ?
CloudEventMode.BINARY : CloudEventMode.STRUCTURED);
+ }
+
+ private static Optional<CloudEventMode> getCloudEventMode(Config config,
String propName) {
+ return config.getOptionalValue(propName,
String.class).map(String::toUpperCase).map(CloudEventMode::valueOf);
}
private static Optional<OnOverflowInfo> onOverflowInfo(Config config,
String name) {
diff --git
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/CloudEventMode.java
similarity index 55%
copy from
quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
copy to
quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/CloudEventMode.java
index e2999c9bc4..bb53eaa773 100644
---
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
+++
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/CloudEventMode.java
@@ -18,18 +18,7 @@
*/
package org.kie.kogito.addon.cloudevents.quarkus.deployment;
-import org.kie.kogito.codegen.api.context.KogitoBuildContext;
-import org.kie.kogito.event.CloudEventMarshaller;
-import org.kie.kogito.event.EventMarshaller;
-
-public class EventEmitterGenerator extends EventGenerator {
-
- public EventEmitterGenerator(KogitoBuildContext context, ChannelInfo
channelInfo) {
- super(context, channelInfo, "EventEmitter");
- if (context.getApplicationProperty("kogito.messaging.as-cloudevents",
Boolean.class).orElse(true)) {
- generateMarshallerField("marshaller", "setCloudEventMarshaller",
CloudEventMarshaller.class);
- } else {
- generateMarshallerField("marshaller", "setEventDataMarshaller",
EventMarshaller.class);
- }
- }
+public enum CloudEventMode {
+ STRUCTURED,
+ BINARY
}
diff --git
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
index e2999c9bc4..1dba1d9144 100644
---
a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
+++
b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java
@@ -26,7 +26,7 @@ public class EventEmitterGenerator extends EventGenerator {
public EventEmitterGenerator(KogitoBuildContext context, ChannelInfo
channelInfo) {
super(context, channelInfo, "EventEmitter");
- if (context.getApplicationProperty("kogito.messaging.as-cloudevents",
Boolean.class).orElse(true)) {
+ if (channelInfo.getCloudEventMode().filter(mode -> mode ==
CloudEventMode.STRUCTURED).isPresent()) {
generateMarshallerField("marshaller", "setCloudEventMarshaller",
CloudEventMarshaller.class);
} else {
generateMarshallerField("marshaller", "setEventDataMarshaller",
EventMarshaller.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]