[ 
https://issues.apache.org/jira/browse/CAMEL-15885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

m updated CAMEL-15885:
----------------------
    Description: 
{code:java}
<quarkus-plugin.version>1.8.1.Final</quarkus-plugin.version>
<dependency>
   <groupId>org.apache.camel.quarkus</groupId>
   <artifactId>camel-quarkus-aws-kinesis</artifactId>
</dependency>{code}
 
 Our producer sends zipped messages. When reading those about 1/4 of all msgs 
cannot be unzipped. When digging into the message blob it became clear that one 
message contains multiple data entities. That probably due to the producer side 
message aggregation ( 
[https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html]).
  
 Instead of a single message the data from the Record entity contains obviously 
several merged data blobs that I'm unable to read (zip error when trying to 
decompress). I'd expect to receive several msgs in my processor instead of the 
msgs merged into one msg blob.

 
{code:java}
 from("aws-kinesis://"  lsc.getStreamName()  
"?amazonKinesisClient=client&bridgeErrorHandler=true&maxResultsPerRequest=500&greedy=true&delay=0&runLoggingLevel=TRACE&iteratorType=LATEST")
                 .routeId("vssKinesisStream")
                 .log("LSC message received.")
                 
.onException(java.util.zip.ZipException.class).process(exchange -> 
{                     metrics.countInvalidPerSec();                 }
)
                 .stop()
                 .end()
                 .process(exchange -> 
{                         metrics.countReceivedPerSec();                        
 Record record = exchange.getIn().getBody(Record.class);                        
 String message = extractMessageFromRecord(record);                         
exchange.getIn().setBody(message);                         
metrics.countValidPerSec();                         LOG.info("Extracted 
message: " + message);                         lastReadMsgAge = 
Duration.between(record.getApproximateArrivalTimestamp().toInstant(), 
Instant.now());                 }
{code}
 
  

  was:
{code:java}
<quarkus-plugin.version>1.8.1.Final</quarkus-plugin.version>
        <dependency>
             <groupId>org.apache.camel.quarkus</groupId>
             <artifactId>camel-quarkus-aws-kinesis</artifactId>
         </dependency>{code}
 
 Our producer sends zipped messages. When reading those about 1/4 of all msgs 
cannot be unzipped. When digging into the message blob it became clear that one 
message contains multiple data entities. That probably due to the producer side 
message aggregation ( 
[https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html]).
  
 Instead of a single message the data from the Record entity contains obviously 
several merged data blobs that I'm unable to read (zip error when trying to 
decompress). I'd expect to receive several msgs in my processor instead of the 
msgs merged into one msg blob.

 
{code:java}
 from("aws-kinesis://"  lsc.getStreamName()  
"?amazonKinesisClient=client&bridgeErrorHandler=true&maxResultsPerRequest=500&greedy=true&delay=0&runLoggingLevel=TRACE&iteratorType=LATEST")
                 .routeId("vssKinesisStream")
                 .log("LSC message received.")
                 
.onException(java.util.zip.ZipException.class).process(exchange -> 
{                     metrics.countInvalidPerSec();                 }
)
                 .stop()
                 .end()
                 .process(exchange -> 
{                         metrics.countReceivedPerSec();                        
 Record record = exchange.getIn().getBody(Record.class);                        
 String message = extractMessageFromRecord(record);                         
exchange.getIn().setBody(message);                         
metrics.countValidPerSec();                         LOG.info("Extracted 
message: " + message);                         lastReadMsgAge = 
Duration.between(record.getApproximateArrivalTimestamp().toInstant(), 
Instant.now());                 }
{code}

  
  


> AWS KINESIS component not de-aggregating producer side aggregated msgs
> ----------------------------------------------------------------------
>
>                 Key: CAMEL-15885
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15885
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-aws
>            Reporter: m
>            Priority: Major
>
> {code:java}
> <quarkus-plugin.version>1.8.1.Final</quarkus-plugin.version>
> <dependency>
>    <groupId>org.apache.camel.quarkus</groupId>
>    <artifactId>camel-quarkus-aws-kinesis</artifactId>
> </dependency>{code}
>  
>  Our producer sends zipped messages. When reading those about 1/4 of all msgs 
> cannot be unzipped. When digging into the message blob it became clear that 
> one message contains multiple data entities. That probably due to the 
> producer side message aggregation ( 
> [https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html]).
>   
>  Instead of a single message the data from the Record entity contains 
> obviously several merged data blobs that I'm unable to read (zip error when 
> trying to decompress). I'd expect to receive several msgs in my processor 
> instead of the msgs merged into one msg blob.
>  
> {code:java}
>  from("aws-kinesis://"  lsc.getStreamName()  
> "?amazonKinesisClient=client&bridgeErrorHandler=true&maxResultsPerRequest=500&greedy=true&delay=0&runLoggingLevel=TRACE&iteratorType=LATEST")
>                  .routeId("vssKinesisStream")
>                  .log("LSC message received.")
>                  
> .onException(java.util.zip.ZipException.class).process(exchange -> 
> {                     metrics.countInvalidPerSec();                 }
> )
>                  .stop()
>                  .end()
>                  .process(exchange -> 
> {                         metrics.countReceivedPerSec();                      
>    Record record = exchange.getIn().getBody(Record.class);                    
>      String message = extractMessageFromRecord(record);                       
>   exchange.getIn().setBody(message);                         
> metrics.countValidPerSec();                         LOG.info("Extracted 
> message: " + message);                         lastReadMsgAge = 
> Duration.between(record.getApproximateArrivalTimestamp().toInstant(), 
> Instant.now());                 }
> {code}
>  
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to