This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new 542f2ba CAMEL-16222: PooledExchangeFactory experiment
542f2ba is described below
commit 542f2ba12d4947eda7fc37216afd169bf4503d8f
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Feb 23 19:05:31 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../aws2/ddbstream/Ddb2StreamConsumer.java | 8 ++-
.../aws2/ddbstream/Ddb2StreamEndpoint.java | 9 ---
.../component/aws2/kinesis/Kinesis2Consumer.java | 13 +++-
.../component/aws2/kinesis/Kinesis2Endpoint.java | 11 ----
.../camel/component/aws2/s3/AWS2S3Consumer.java | 75 ++++++++++++++++++++--
.../camel/component/aws2/s3/AWS2S3Endpoint.java | 72 ---------------------
.../camel/component/aws2/sqs/Sqs2Consumer.java | 52 ++++++++++++++-
.../camel/component/aws2/sqs/Sqs2Endpoint.java | 44 -------------
8 files changed, 137 insertions(+), 147 deletions(-)
diff --git
a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
index 276195f..32bcb64 100644
---
a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
+++
b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
@@ -97,6 +97,12 @@ public class Ddb2StreamConsumer extends
ScheduledBatchPollingConsumer {
return processedExchanges;
}
+ protected Exchange createExchange(Record record) {
+ Exchange ex = createExchange(true);
+ ex.getIn().setBody(record, Record.class);
+ return ex;
+ }
+
private DynamoDbStreamsClient getClient() {
return getEndpoint().getClient();
}
@@ -130,7 +136,7 @@ public class Ddb2StreamConsumer extends
ScheduledBatchPollingConsumer {
for (Record record : records) {
BigInteger recordSeqNum = new
BigInteger(record.dynamodb().sequenceNumber());
if (condition == null || condition.matches(providedSeqNum,
recordSeqNum)) {
- exchanges.add(getEndpoint().createExchange(record));
+ exchanges.add(createExchange(record));
}
}
return exchanges;
diff --git
a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
index 5c9b0a2..6f314ff 100644
---
a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
+++
b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
@@ -20,7 +20,6 @@ import java.net.URI;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.spi.UriEndpoint;
@@ -34,7 +33,6 @@ import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import
software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClientBuilder;
import software.amazon.awssdk.utils.AttributeMap;
@@ -69,13 +67,6 @@ public class Ddb2StreamEndpoint extends
ScheduledPollEndpoint {
return consumer;
}
- Exchange createExchange(Record record) {
- Exchange ex = super.createExchange();
- ex.getIn().setBody(record, Record.class);
-
- return ex;
- }
-
@Override
public void doStart() throws Exception {
super.doStart();
diff --git
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index e620181..fcda12b 100644
---
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -75,7 +75,7 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer {
// May cache the last successful sequence number, and pass it to the
// getRecords request. That way, on the next poll, we start from where
// we left off, however, I don't know what happens to subsequent
- // exchanges when an earlier echangee fails.
+ // exchanges when an earlier exchange fails.
currentShardIterator = result.nextShardIterator();
if (isShardClosed) {
@@ -178,11 +178,20 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer {
private Queue<Exchange> createExchanges(List<Record> records) {
Queue<Exchange> exchanges = new ArrayDeque<>();
for (Record record : records) {
- exchanges.add(getEndpoint().createExchange(record));
+ exchanges.add(createExchange(record));
}
return exchanges;
}
+ protected Exchange createExchange(Record record) {
+ Exchange exchange = createExchange(true);
+ exchange.getIn().setBody(record);
+ exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME,
record.approximateArrivalTimestamp());
+ exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY,
record.partitionKey());
+ exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER,
record.sequenceNumber());
+ return exchange;
+ }
+
private boolean hasSequenceNumber() {
return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
&&
(getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
diff --git
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index ac0b0c4..ebe2e31 100644
---
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -20,7 +20,6 @@ import java.net.URI;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.spi.UriEndpoint;
@@ -36,7 +35,6 @@ import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
-import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.AttributeMap;
@@ -102,15 +100,6 @@ public class Kinesis2Endpoint extends
ScheduledPollEndpoint {
return consumer;
}
- public Exchange createExchange(Record record) {
- Exchange exchange = super.createExchange();
- exchange.getIn().setBody(record);
- exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME,
record.approximateArrivalTimestamp());
- exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY,
record.partitionKey());
- exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER,
record.sequenceNumber());
- return exchange;
- }
-
public KinesisClient getClient() {
return kinesisClient;
}
diff --git
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
index 2776fd0..a4e4fef 100644
---
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
+++
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.aws2.s3;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
@@ -25,11 +26,14 @@ import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.ExtendedExchange;
-import org.apache.camel.NoFactoryAvailableException;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
@@ -52,6 +56,7 @@ import
software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.utils.IoUtils;
/**
* A Consumer of messages from the Amazon Web Service Simple Storage Service
<a href="http://aws.amazon.com/s3/">AWS
@@ -64,7 +69,7 @@ public class AWS2S3Consumer extends
ScheduledBatchPollingConsumer {
private String marker;
private transient String s3ConsumerToString;
- public AWS2S3Consumer(AWS2S3Endpoint endpoint, Processor processor) throws
NoFactoryAvailableException {
+ public AWS2S3Consumer(AWS2S3Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}
@@ -180,7 +185,7 @@ public class AWS2S3Consumer extends
ScheduledBatchPollingConsumer {
protected Queue<Exchange>
createExchanges(ResponseInputStream<GetObjectResponse> s3Object, String key) {
Queue<Exchange> answer = new LinkedList<>();
- Exchange exchange = getEndpoint().createExchange(s3Object, key);
+ Exchange exchange = createExchange(s3Object, key);
answer.add(exchange);
return answer;
}
@@ -212,7 +217,7 @@ public class AWS2S3Consumer extends
ScheduledBatchPollingConsumer {
if (includeS3Object(s3Object)) {
s3Objects.add(s3Object);
- Exchange exchange = getEndpoint().createExchange(s3Object,
s3ObjectSummary.key());
+ Exchange exchange = createExchange(s3Object,
s3ObjectSummary.key());
answer.add(exchange);
} else {
// If includeFolders != true and the object is not
included, it is safe to close the object here.
@@ -238,7 +243,6 @@ public class AWS2S3Consumer extends
ScheduledBatchPollingConsumer {
* @return true to include, false to exclude
*/
protected boolean includeS3Object(ResponseInputStream<GetObjectResponse>
s3Object) {
-
if (getConfiguration().isIncludeFolders()) {
return true;
} else {
@@ -365,6 +369,67 @@ public class AWS2S3Consumer extends
ScheduledBatchPollingConsumer {
return (AWS2S3Endpoint) super.getEndpoint();
}
+ public Exchange createExchange(ResponseInputStream<GetObjectResponse>
s3Object, String key) {
+ return createExchange(getEndpoint().getExchangePattern(), s3Object,
key);
+ }
+
+ public Exchange createExchange(ExchangePattern pattern,
ResponseInputStream<GetObjectResponse> s3Object, String key) {
+ LOG.trace("Getting object with key [{}] from bucket [{}]...", key,
getConfiguration().getBucketName());
+
+ LOG.trace("Got object [{}]", s3Object);
+
+ Exchange exchange = createExchange(true);
+ exchange.setPattern(pattern);
+ Message message = exchange.getIn();
+
+ if (getConfiguration().isIncludeBody()) {
+ try {
+ message.setBody(IoUtils.toByteArray(s3Object));
+ } catch (IOException e) {
+ throw new RuntimeCamelException(e);
+ }
+ } else {
+ message.setBody(s3Object);
+ }
+
+ message.setHeader(AWS2S3Constants.KEY, key);
+ message.setHeader(AWS2S3Constants.BUCKET_NAME,
getConfiguration().getBucketName());
+ message.setHeader(AWS2S3Constants.E_TAG, s3Object.response().eTag());
+ message.setHeader(AWS2S3Constants.LAST_MODIFIED,
s3Object.response().lastModified());
+ message.setHeader(AWS2S3Constants.VERSION_ID,
s3Object.response().versionId());
+ message.setHeader(AWS2S3Constants.CONTENT_TYPE,
s3Object.response().contentType());
+ message.setHeader(AWS2S3Constants.CONTENT_LENGTH,
s3Object.response().contentLength());
+ message.setHeader(AWS2S3Constants.CONTENT_ENCODING,
s3Object.response().contentEncoding());
+ message.setHeader(AWS2S3Constants.CONTENT_DISPOSITION,
s3Object.response().contentDisposition());
+ message.setHeader(AWS2S3Constants.CACHE_CONTROL,
s3Object.response().cacheControl());
+ message.setHeader(AWS2S3Constants.SERVER_SIDE_ENCRYPTION,
s3Object.response().serverSideEncryption());
+ message.setHeader(AWS2S3Constants.EXPIRATION_TIME,
s3Object.response().expiration());
+ message.setHeader(AWS2S3Constants.REPLICATION_STATUS,
s3Object.response().replicationStatus());
+ message.setHeader(AWS2S3Constants.STORAGE_CLASS,
s3Object.response().storageClass());
+ message.setHeader(AWS2S3Constants.METADATA,
s3Object.response().metadata());
+
+ /*
+ * If includeBody == true, it is safe to close the object here because
the S3Object
+ * was consumed already. If includeBody != true, the caller is
responsible for
+ * closing the stream once the body has been fully consumed or use the
autoCloseBody
+ * configuration to automatically schedule the body closing at the end
of exchange.
+ */
+ if (getConfiguration().isIncludeBody()) {
+ IOHelper.close(s3Object);
+ } else {
+ if (getConfiguration().isAutocloseBody()) {
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(new
SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ IOHelper.close(s3Object);
+ }
+ });
+ }
+ }
+
+ return exchange;
+ }
+
@Override
public String toString() {
if (s3ConsumerToString == null) {
diff --git
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
index 8be1966..9cad4ee 100644
---
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
+++
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
@@ -16,37 +16,25 @@
*/
package org.apache.camel.component.aws2.s3;
-import java.io.IOException;
-
import org.apache.camel.Category;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.aws2.s3.client.AWS2S3ClientFactory;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.ScheduledPollEndpoint;
-import org.apache.camel.support.SynchronizationAdapter;
-import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
-import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.PutBucketPolicyRequest;
-import software.amazon.awssdk.utils.IoUtils;
/**
* Store and retrieve objects from AWS S3 Storage Service using AWS SDK
version 2.x.
@@ -152,66 +140,6 @@ public class AWS2S3Endpoint extends ScheduledPollEndpoint {
super.doStop();
}
- public Exchange createExchange(ResponseInputStream<GetObjectResponse>
s3Object, String key) {
- return createExchange(getExchangePattern(), s3Object, key);
- }
-
- public Exchange createExchange(ExchangePattern pattern,
ResponseInputStream<GetObjectResponse> s3Object, String key) {
- LOG.trace("Getting object with key [{}] from bucket [{}]...", key,
getConfiguration().getBucketName());
-
- LOG.trace("Got object [{}]", s3Object);
-
- Exchange exchange = super.createExchange(pattern);
- Message message = exchange.getIn();
-
- if (configuration.isIncludeBody()) {
- try {
- message.setBody(IoUtils.toByteArray(s3Object));
- } catch (IOException e) {
- throw new RuntimeCamelException(e);
- }
- } else {
- message.setBody(s3Object);
- }
-
- message.setHeader(AWS2S3Constants.KEY, key);
- message.setHeader(AWS2S3Constants.BUCKET_NAME,
getConfiguration().getBucketName());
- message.setHeader(AWS2S3Constants.E_TAG, s3Object.response().eTag());
- message.setHeader(AWS2S3Constants.LAST_MODIFIED,
s3Object.response().lastModified());
- message.setHeader(AWS2S3Constants.VERSION_ID,
s3Object.response().versionId());
- message.setHeader(AWS2S3Constants.CONTENT_TYPE,
s3Object.response().contentType());
- message.setHeader(AWS2S3Constants.CONTENT_LENGTH,
s3Object.response().contentLength());
- message.setHeader(AWS2S3Constants.CONTENT_ENCODING,
s3Object.response().contentEncoding());
- message.setHeader(AWS2S3Constants.CONTENT_DISPOSITION,
s3Object.response().contentDisposition());
- message.setHeader(AWS2S3Constants.CACHE_CONTROL,
s3Object.response().cacheControl());
- message.setHeader(AWS2S3Constants.SERVER_SIDE_ENCRYPTION,
s3Object.response().serverSideEncryption());
- message.setHeader(AWS2S3Constants.EXPIRATION_TIME,
s3Object.response().expiration());
- message.setHeader(AWS2S3Constants.REPLICATION_STATUS,
s3Object.response().replicationStatus());
- message.setHeader(AWS2S3Constants.STORAGE_CLASS,
s3Object.response().storageClass());
- message.setHeader(AWS2S3Constants.METADATA,
s3Object.response().metadata());
-
- /*
- * If includeBody == true, it is safe to close the object here because
the S3Object
- * was consumed already. If includeBody != true, the caller is
responsible for
- * closing the stream once the body has been fully consumed or use the
autoCloseBody
- * configuration to automatically schedule the body closing at the end
of exchange.
- */
- if (configuration.isIncludeBody()) {
- IOHelper.close(s3Object);
- } else {
- if (configuration.isAutocloseBody()) {
- exchange.adapt(ExtendedExchange.class).addOnCompletion(new
SynchronizationAdapter() {
- @Override
- public void onDone(Exchange exchange) {
- IOHelper.close(s3Object);
- }
- });
- }
- }
-
- return exchange;
- }
-
public AWS2S3Configuration getConfiguration() {
return configuration;
}
diff --git
a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 660df45..3cc5cb7 100644
---
a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++
b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -18,17 +18,21 @@ package org.apache.camel.component.aws2.sqs;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.ExtendedExchange;
-import org.apache.camel.NoFactoryAvailableException;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
@@ -40,6 +44,7 @@ import
software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.services.sqs.SqsClient;
import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageNotInflightException;
import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
@@ -60,7 +65,7 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
private Collection<String> attributeNames;
private Collection<String> messageAttributeNames;
- public Sqs2Consumer(Sqs2Endpoint endpoint, Processor processor) throws
NoFactoryAvailableException {
+ public Sqs2Consumer(Sqs2Endpoint endpoint, Processor processor) {
super(endpoint, processor);
if (getConfiguration().getAttributeNames() != null) {
@@ -136,7 +141,7 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
Queue<Exchange> answer = new LinkedList<>();
for (software.amazon.awssdk.services.sqs.model.Message message :
messages) {
- Exchange exchange = getEndpoint().createExchange(message);
+ Exchange exchange = createExchange(message);
answer.add(exchange);
}
@@ -282,6 +287,47 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
return (Sqs2Endpoint) super.getEndpoint();
}
+ public Exchange
createExchange(software.amazon.awssdk.services.sqs.model.Message msg) {
+ return createExchange(getEndpoint().getExchangePattern(), msg);
+ }
+
+ private Exchange createExchange(ExchangePattern pattern,
software.amazon.awssdk.services.sqs.model.Message msg) {
+ Exchange exchange = createExchange(true);
+ exchange.setPattern(pattern);
+ Message message = exchange.getIn();
+ message.setBody(msg.body());
+ message.setHeaders(new HashMap<>(msg.attributesAsStrings()));
+ message.setHeader(Sqs2Constants.MESSAGE_ID, msg.messageId());
+ message.setHeader(Sqs2Constants.MD5_OF_BODY, msg.md5OfBody());
+ message.setHeader(Sqs2Constants.RECEIPT_HANDLE, msg.receiptHandle());
+ message.setHeader(Sqs2Constants.ATTRIBUTES, msg.attributes());
+ message.setHeader(Sqs2Constants.MESSAGE_ATTRIBUTES,
msg.messageAttributes());
+
+ // Need to apply the SqsHeaderFilterStrategy this time
+ HeaderFilterStrategy headerFilterStrategy =
getEndpoint().getHeaderFilterStrategy();
+ // add all sqs message attributes as camel message headers so that
+ // knowledge of
+ // the Sqs class MessageAttributeValue will not leak to the client
+ for (Map.Entry<String, MessageAttributeValue> entry :
msg.messageAttributes().entrySet()) {
+ String header = entry.getKey();
+ Object value = translateValue(entry.getValue());
+ if (!headerFilterStrategy.applyFilterToExternalHeaders(header,
value, exchange)) {
+ message.setHeader(header, value);
+ }
+ }
+ return exchange;
+ }
+
+ private static Object translateValue(MessageAttributeValue mav) {
+ Object result = null;
+ if (mav.stringValue() != null) {
+ result = mav.stringValue();
+ } else if (mav.binaryValue() != null) {
+ result = mav.binaryValue();
+ }
+ return result;
+ }
+
@Override
public String toString() {
if (sqsConsumerToString == null) {
diff --git
a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
index 2dc209c..1eb4014 100644
---
a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
+++
b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
@@ -21,13 +21,9 @@ import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.aws2.sqs.client.Sqs2ClientFactory;
@@ -52,7 +48,6 @@ import
software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
-import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest;
@@ -343,36 +338,6 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint
implements HeaderFilterS
super.doStop();
}
- public Exchange
createExchange(software.amazon.awssdk.services.sqs.model.Message msg) {
- return createExchange(getExchangePattern(), msg);
- }
-
- private Exchange createExchange(ExchangePattern pattern,
software.amazon.awssdk.services.sqs.model.Message msg) {
- Exchange exchange = super.createExchange(pattern);
- Message message = exchange.getIn();
- message.setBody(msg.body());
- message.setHeaders(new HashMap<>(msg.attributesAsStrings()));
- message.setHeader(Sqs2Constants.MESSAGE_ID, msg.messageId());
- message.setHeader(Sqs2Constants.MD5_OF_BODY, msg.md5OfBody());
- message.setHeader(Sqs2Constants.RECEIPT_HANDLE, msg.receiptHandle());
- message.setHeader(Sqs2Constants.ATTRIBUTES, msg.attributes());
- message.setHeader(Sqs2Constants.MESSAGE_ATTRIBUTES,
msg.messageAttributes());
-
- // Need to apply the SqsHeaderFilterStrategy this time
- HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy();
- // add all sqs message attributes as camel message headers so that
- // knowledge of
- // the Sqs class MessageAttributeValue will not leak to the client
- for (Entry<String, MessageAttributeValue> entry :
msg.messageAttributes().entrySet()) {
- String header = entry.getKey();
- Object value = translateValue(entry.getValue());
- if (!headerFilterStrategy.applyFilterToExternalHeaders(header,
value, exchange)) {
- message.setHeader(header, value);
- }
- }
- return exchange;
- }
-
public Sqs2Configuration getConfiguration() {
return configuration;
}
@@ -406,13 +371,4 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint
implements HeaderFilterS
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
- private Object translateValue(MessageAttributeValue mav) {
- Object result = null;
- if (mav.stringValue() != null) {
- result = mav.stringValue();
- } else if (mav.binaryValue() != null) {
- result = mav.binaryValue();
- }
- return result;
- }
}