m created CAMEL-15885:
-------------------------
Summary: AWS KINESIS component not de-aggregating producer side
aggregated msgs
Key: CAMEL-15885
URL: https://issues.apache.org/jira/browse/CAMEL-15885
Project: Camel
Issue Type: Bug
Components: camel-aws
Reporter: m
```<quarkus-plugin.version>1.8.1.Final</quarkus-plugin.version>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-aws-kinesis</artifactId>
</dependency>```
```
from("aws-kinesis://mystream?amazonKinesisClient=client&bridgeErrorHandler=true&maxResultsPerRequest=500&greedy=true&delay=0&runLoggingLevel=TRACE&iteratorType=LATEST")
```
Our producer sends zipped messages. When reading those about 1/4 of all msgs
cannot be unzipped. When digging into the message blob it became clear that one
message contains multiple data entities. That probably due to the producer side
message aggregation (
https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
Instead of a single message the data from the Record entity contains obviously
several merged data blobs that I'm unable to read (zip error when trying to
decompress). I'd expect to receive several msgs in my processor instead of the
msgs merged into one msg blob.
```
from("aws-kinesis://" + lsc.getStreamName() +
"?amazonKinesisClient=lscClient&bridgeErrorHandler=true&maxResultsPerRequest=500&greedy=true&delay=0&runLoggingLevel=TRACE&iteratorType=LATEST")
.routeId("vssKinesisStream")
.log("LSC message received.")
.onException(java.util.zip.ZipException.class).process(exchange
-> {
metrics.countInvalidPerSec();
})
.stop()
.end()
.process(exchange -> {
metrics.countReceivedPerSec();
Record record = exchange.getIn().getBody(Record.class);
String message = extractMessageFromRecord(record);
exchange.getIn().setBody(message);
metrics.countValidPerSec();
LOG.info("Extracted message: " + message);
lastReadMsgAge =
Duration.between(record.getApproximateArrivalTimestamp().toInstant(),
Instant.now());
});
```
--
This message was sent by Atlassian Jira
(v8.3.4#803005)