[ 
https://issues.apache.org/jira/browse/CAMEL-15885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrea Cosentino updated CAMEL-15885:
-------------------------------------
    Priority: Minor  (was: Major)

> AWS KINESIS component not de-aggregating producer side aggregated msgs
> ----------------------------------------------------------------------
>
>                 Key: CAMEL-15885
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15885
>             Project: Camel
>          Issue Type: New Feature
>          Components: camel-aws
>            Reporter: m
>            Priority: Minor
>
> {code:java}
> <quarkus-plugin.version>1.8.1.Final</quarkus-plugin.version>
> <dependency>
>    <groupId>org.apache.camel.quarkus</groupId>
>    <artifactId>camel-quarkus-aws-kinesis</artifactId>
> </dependency>{code}
>  
>  Our producer sends zipped messages. When reading those about 1/4 of all msgs 
> cannot be unzipped. When digging into the message blob it became clear that 
> one message contains multiple data blobs. That's probably due to the producer 
> side message aggregation ( 
> [https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html]).
>   
>  Instead of a single message the data from the Record entity contains 
> obviously several merged data blobs that I'm unable to read (zip error when 
> trying to decompress). I'd expect to receive several msgs in my processor 
> instead of several msgs merged into one msg blob.
>  
> {code:java}
>  from("aws-kinesis://"  lsc.getStreamName()  
> "?amazonKinesisClient=client&bridgeErrorHandler=true&maxResultsPerRequest=500&greedy=true&delay=0&runLoggingLevel=TRACE&iteratorType=LATEST")
>                  .routeId("vssKinesisStream")
>                  .log("LSC message received.")
>                  
> .onException(java.util.zip.ZipException.class).process(exchange -> 
> {                     metrics.countInvalidPerSec();                 }
> )
>                  .stop()
>                  .end()
>                  .process(exchange -> 
> {                         metrics.countReceivedPerSec();                      
>    Record record = exchange.getIn().getBody(Record.class);                    
>      String message = extractMessageFromRecord(record);                       
>   exchange.getIn().setBody(message);                         
> metrics.countValidPerSec();                         LOG.info("Extracted 
> message: " + message);                         lastReadMsgAge = 
> Duration.between(record.getApproximateArrivalTimestamp().toInstant(), 
> Instant.now());                 }
> {code}
>  
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to