lburgazzoli commented on a change in pull request #158: Support cloud events spec v3 URL: https://github.com/apache/camel-k-runtime/pull/158#discussion_r335027384
########## File path: camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java ########## @@ -24,58 +24,32 @@ import java.util.Objects; import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.knative.KnativeEndpoint; import org.apache.camel.component.knative.spi.CloudEvent; -import org.apache.camel.component.knative.spi.CloudEvents; import org.apache.camel.component.knative.spi.Knative; import org.apache.camel.component.knative.spi.KnativeEnvironment; -import org.apache.commons.lang3.StringUtils; -import static org.apache.camel.util.ObjectHelper.ifNotEmpty; - -final class CloudEventV02Processor implements CloudEventProcessor { +abstract class AbstractCloudEventProcessor implements CloudEventProcessor { private final CloudEvent cloudEvent; - public CloudEventV02Processor() { - this.cloudEvent = CloudEvents.V02; + protected AbstractCloudEventProcessor(CloudEvent cloudEvent) { + this.cloudEvent = cloudEvent; } @Override public CloudEvent cloudEvent() { return cloudEvent; } - @Override - public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) { - return exchange -> { - String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); - if (eventType == null) { - eventType = endpoint.getConfiguration().getCloudEventsType(); - } - - final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); - final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault()); - final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created); - final Map<String, Object> headers = exchange.getIn().getHeaders(); - - headers.putIfAbsent(cloudEvent.attributes().id(), exchange.getExchangeId()); - headers.putIfAbsent(cloudEvent.attributes().source(), endpoint.getEndpointUri()); - headers.putIfAbsent(cloudEvent.attributes().spec(), cloudEvent.version()); - headers.putIfAbsent(cloudEvent.attributes().type(), eventType); - headers.putIfAbsent(cloudEvent.attributes().time(), eventTime); - headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); - - // Always remove host so it's always computed from the URL and not inherited from the exchange - headers.remove("Host"); - }; - } - @SuppressWarnings("unchecked") @Override public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) { return exchange -> { + if (Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_BATCH_CONTENT_MODE)) { + throw new UnsupportedOperationException("Batched CloudEvents are not yet supported"); Review comment: https://github.com/apache/camel-k-runtime/issues/159 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services