[
https://issues.apache.org/jira/browse/CAMEL-15885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andrea Cosentino updated CAMEL-15885:
-------------------------------------
Priority: Minor (was: Major)
> AWS KINESIS component not de-aggregating producer side aggregated msgs
> ----------------------------------------------------------------------
>
> Key: CAMEL-15885
> URL: https://issues.apache.org/jira/browse/CAMEL-15885
> Project: Camel
> Issue Type: New Feature
> Components: camel-aws
> Reporter: m
> Priority: Minor
>
> {code:java}
> <quarkus-plugin.version>1.8.1.Final</quarkus-plugin.version>
> <dependency>
> <groupId>org.apache.camel.quarkus</groupId>
> <artifactId>camel-quarkus-aws-kinesis</artifactId>
> </dependency>{code}
>
> Our producer sends zipped messages. When reading those about 1/4 of all msgs
> cannot be unzipped. When digging into the message blob it became clear that
> one message contains multiple data blobs. That's probably due to the producer
> side message aggregation (
> [https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html]).
>
> Instead of a single message the data from the Record entity contains
> obviously several merged data blobs that I'm unable to read (zip error when
> trying to decompress). I'd expect to receive several msgs in my processor
> instead of several msgs merged into one msg blob.
>
> {code:java}
> from("aws-kinesis://" lsc.getStreamName()
> "?amazonKinesisClient=client&bridgeErrorHandler=true&maxResultsPerRequest=500&greedy=true&delay=0&runLoggingLevel=TRACE&iteratorType=LATEST")
> .routeId("vssKinesisStream")
> .log("LSC message received.")
>
> .onException(java.util.zip.ZipException.class).process(exchange ->
> { metrics.countInvalidPerSec(); }
> )
> .stop()
> .end()
> .process(exchange ->
> { metrics.countReceivedPerSec();
> Record record = exchange.getIn().getBody(Record.class);
> String message = extractMessageFromRecord(record);
> exchange.getIn().setBody(message);
> metrics.countValidPerSec(); LOG.info("Extracted
> message: " + message); lastReadMsgAge =
> Duration.between(record.getApproximateArrivalTimestamp().toInstant(),
> Instant.now()); }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)