This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new a7e38a7d527 Change aws2 kinesis record body type to byte array (#14179)
a7e38a7d527 is described below
commit a7e38a7d52758351e07fb244c14e9e8e61a6cf10
Author: fanyang <[email protected]>
AuthorDate: Sat May 18 16:16:40 2024 +0800
Change aws2 kinesis record body type to byte array (#14179)
Co-authored-by: Fan Yang <[email protected]>
---
.../org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java | 2 +-
.../org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java | 6 ++----
2 files changed, 3 insertions(+), 5 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 93209375dcd..fd9b1ee098d 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -303,7 +303,7 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer implements R
protected Exchange createExchange(Shard shard, Record dataRecord) {
LOG.debug("Received Kinesis record with partition_key={}",
dataRecord.partitionKey());
Exchange exchange = createExchange(true);
- exchange.getIn().setBody(dataRecord.data().asInputStream());
+ exchange.getIn().setBody(dataRecord.data().asByteArray());
exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME,
dataRecord.approximateArrivalTimestamp());
exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY,
dataRecord.partitionKey());
exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER,
dataRecord.sequenceNumber());
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
index 1763dd3fe83..ce8903aa4ac 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
@@ -16,8 +16,6 @@
*/
package org.apache.camel.component.aws2.kinesis;
-import java.nio.ByteBuffer;
-
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.support.DefaultProducer;
@@ -57,12 +55,12 @@ public class Kinesis2Producer extends DefaultProducer {
}
private PutRecordRequest createRequest(Exchange exchange) {
- ByteBuffer body = exchange.getIn().getBody(ByteBuffer.class);
+ byte[] body = exchange.getIn().getBody(byte[].class);
Object partitionKey =
exchange.getIn().getHeader(Kinesis2Constants.PARTITION_KEY);
Object sequenceNumber =
exchange.getIn().getHeader(Kinesis2Constants.SEQUENCE_NUMBER);
PutRecordRequest.Builder putRecordRequest = PutRecordRequest.builder();
- putRecordRequest.data(SdkBytes.fromByteBuffer(body));
+ putRecordRequest.data(SdkBytes.fromByteArray(body));
putRecordRequest.streamName(getEndpoint().getConfiguration().getStreamName());
putRecordRequest.partitionKey(partitionKey.toString());
if (sequenceNumber != null) {