This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch objectHelper in repository https://gitbox.apache.org/repos/asf/camel.git
commit 247d5a7706e36e29e7b987a014d85dd4d73fe8a3 Author: Andrea Cosentino <[email protected]> AuthorDate: Wed Jan 28 11:45:12 2026 +0100 Camel-AWS components: Use ObjectHelper for null checks - SQS Signed-off-by: Andrea Cosentino <[email protected]> --- .../camel/component/aws2/sqs/Sqs2Component.java | 3 +- .../camel/component/aws2/sqs/Sqs2Consumer.java | 14 +++--- .../camel/component/aws2/sqs/Sqs2Endpoint.java | 56 +++++++++++----------- .../camel/component/aws2/sqs/Sqs2Producer.java | 34 ++++++------- 4 files changed, 54 insertions(+), 53 deletions(-) diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Component.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Component.java index 16e35cbafd63..7b716fd7238d 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Component.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Component.java @@ -23,6 +23,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.HealthCheckComponent; +import org.apache.camel.util.ObjectHelper; import software.amazon.awssdk.regions.Region; /** @@ -47,7 +48,7 @@ public class Sqs2Component extends HealthCheckComponent { if (remaining == null || remaining.isBlank()) { throw new IllegalArgumentException("Queue name must be specified."); } - Sqs2Configuration configuration = this.configuration != null ? this.configuration.copy() : new Sqs2Configuration(); + Sqs2Configuration configuration = ObjectHelper.isNotEmpty(this.configuration) ? this.configuration.copy() : new Sqs2Configuration(); if (remaining.startsWith("arn:")) { String[] parts = remaining.split(":"); if (parts.length != 6 || !parts[2].equals("sqs")) { diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java index d50380bb362a..1744dbfc96b5 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java @@ -123,7 +123,7 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { Queue<Exchange> answer = new LinkedList<>(); for (software.amazon.awssdk.services.sqs.model.Message message : messages) { - if (message != null) { + if (ObjectHelper.isNotEmpty(message)) { Exchange exchange = createExchange(message); answer.add(exchange); } @@ -228,7 +228,7 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { */ protected void processRollback(Exchange exchange) { Exception cause = exchange.getException(); - if (cause != null) { + if (ObjectHelper.isNotEmpty(cause)) { getExceptionHandler().handleException( "Error during processing exchange. Will attempt to process the message on next poll.", exchange, cause); } @@ -321,7 +321,7 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { Integer visibilityTimeout = getConfiguration().getVisibilityTimeout(); - if (visibilityTimeout != null && visibilityTimeout > 0) { + if (ObjectHelper.isNotEmpty(visibilityTimeout) && visibilityTimeout > 0) { int delay = Math.max(1, visibilityTimeout / 2); this.timeoutExtender = new TimeoutExtender(visibilityTimeout, delay); @@ -340,21 +340,21 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { @Override protected void doShutdown() throws Exception { - if (timeoutExtender != null) { + if (ObjectHelper.isNotEmpty(timeoutExtender)) { timeoutExtender.cancel(); timeoutExtender = null; } - if (scheduledFuture != null) { + if (ObjectHelper.isNotEmpty(scheduledFuture)) { scheduledFuture.cancel(true); scheduledFuture = null; } - if (scheduledExecutor != null) { + if (ObjectHelper.isNotEmpty(scheduledExecutor)) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutor); scheduledExecutor = null; } - if (pollingTask != null) { + if (ObjectHelper.isNotEmpty(pollingTask)) { pollingTask.close(); pollingTask = null; } diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java index 2883f097b752..b5b2e08d61f7 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java @@ -147,15 +147,15 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS protected void doInit() throws Exception { super.doInit(); - client = configuration.getAmazonSQSClient() != null + client = ObjectHelper.isNotEmpty(configuration.getAmazonSQSClient()) ? configuration.getAmazonSQSClient() : Sqs2ClientFactory.getSqsClient(configuration); // check the setting the headerFilterStrategy - if (headerFilterStrategy == null) { + if (ObjectHelper.isEmpty(headerFilterStrategy)) { headerFilterStrategy = new Sqs2HeaderFilterStrategy(); } - if (configuration.getQueueUrl() != null) { + if (ObjectHelper.isNotEmpty(configuration.getQueueUrl())) { queueUrl = configuration.getQueueUrl(); queueUrlInitialized = true; } else { @@ -163,11 +163,11 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS // built manually. // This allows accessing queues where you don't have permission to // list queues or query queues - if (configuration.getRegion() != null && configuration.getQueueOwnerAWSAccountId() != null) { + if (ObjectHelper.isNotEmpty(configuration.getRegion()) && ObjectHelper.isNotEmpty(configuration.getQueueOwnerAWSAccountId())) { queueUrl = getAwsEndpointUri() + "/" + configuration.getQueueOwnerAWSAccountId() + "/" + configuration.getQueueName(); queueUrlInitialized = true; - } else if (configuration.getQueueOwnerAWSAccountId() != null) { + } else if (ObjectHelper.isNotEmpty(configuration.getQueueOwnerAWSAccountId())) { GetQueueUrlRequest.Builder getQueueUrlRequest = GetQueueUrlRequest.builder(); getQueueUrlRequest.queueName(configuration.getQueueName()); getQueueUrlRequest.queueOwnerAWSAccountId(configuration.getQueueOwnerAWSAccountId()); @@ -179,7 +179,7 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS } } - if (queueUrl == null && configuration.isAutoCreateQueue()) { + if (ObjectHelper.isEmpty(queueUrl) && configuration.isAutoCreateQueue()) { createQueue(client); } else { LOG.debug("Using Amazon SQS queue url: {}", queueUrl); @@ -203,13 +203,13 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS } } - if (queueUrl != null) { + if (ObjectHelper.isNotEmpty(queueUrl)) { queueUrlInitialized = true; break; } String token = listQueuesResult.nextToken(); - if (token == null) { + if (ObjectHelper.isEmpty(token)) { break; } @@ -253,38 +253,38 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS = getConfiguration().getMessageDeduplicationIdStrategy() instanceof NullMessageDeduplicationIdStrategy; attributes.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, String.valueOf(useContentBasedDeduplication)); } - if (getConfiguration().getDefaultVisibilityTimeout() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getDefaultVisibilityTimeout())) { attributes.put(QueueAttributeName.VISIBILITY_TIMEOUT, String.valueOf(getConfiguration().getDefaultVisibilityTimeout())); } - if (getConfiguration().getMaximumMessageSize() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getMaximumMessageSize())) { attributes.put(QueueAttributeName.MAXIMUM_MESSAGE_SIZE, String.valueOf(getConfiguration().getMaximumMessageSize())); } - if (getConfiguration().getMessageRetentionPeriod() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getMessageRetentionPeriod())) { attributes.put(QueueAttributeName.MESSAGE_RETENTION_PERIOD, String.valueOf(getConfiguration().getMessageRetentionPeriod())); } - if (getConfiguration().getPolicy() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getPolicy())) { InputStream s = ResourceHelper.resolveMandatoryResourceAsInputStream(this.getCamelContext(), getConfiguration().getPolicy()); String policy = IOUtils.toString(s, Charset.defaultCharset()); attributes.put(QueueAttributeName.POLICY, policy); } - if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getReceiveMessageWaitTimeSeconds())) { attributes.put(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds())); } - if (getConfiguration().getDelaySeconds() != null && getConfiguration().isDelayQueue()) { + if (ObjectHelper.isNotEmpty(getConfiguration().getDelaySeconds()) && getConfiguration().isDelayQueue()) { attributes.put(QueueAttributeName.DELAY_SECONDS, String.valueOf(getConfiguration().getDelaySeconds())); } - if (getConfiguration().getRedrivePolicy() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getRedrivePolicy())) { attributes.put(QueueAttributeName.REDRIVE_POLICY, getConfiguration().getRedrivePolicy()); } if (getConfiguration().isServerSideEncryptionEnabled()) { - if (getConfiguration().getKmsMasterKeyId() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getKmsMasterKeyId())) { attributes.put(QueueAttributeName.KMS_MASTER_KEY_ID, getConfiguration().getKmsMasterKeyId()); } - if (getConfiguration().getKmsDataKeyReusePeriodSeconds() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getKmsDataKeyReusePeriodSeconds())) { attributes.put(QueueAttributeName.KMS_DATA_KEY_REUSE_PERIOD_SECONDS, String.valueOf(getConfiguration().getKmsDataKeyReusePeriodSeconds())); } @@ -310,38 +310,38 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS private void updateQueueAttributes(SqsClient client) throws IOException { SetQueueAttributesRequest.Builder request = SetQueueAttributesRequest.builder().queueUrl(queueUrl); Map<QueueAttributeName, String> attributes = new EnumMap<>(QueueAttributeName.class); - if (getConfiguration().getDefaultVisibilityTimeout() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getDefaultVisibilityTimeout())) { attributes.put(QueueAttributeName.VISIBILITY_TIMEOUT, String.valueOf(getConfiguration().getDefaultVisibilityTimeout())); } - if (getConfiguration().getMaximumMessageSize() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getMaximumMessageSize())) { attributes.put(QueueAttributeName.MAXIMUM_MESSAGE_SIZE, String.valueOf(getConfiguration().getMaximumMessageSize())); } - if (getConfiguration().getMessageRetentionPeriod() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getMessageRetentionPeriod())) { attributes.put(QueueAttributeName.MESSAGE_RETENTION_PERIOD, String.valueOf(getConfiguration().getMessageRetentionPeriod())); } - if (getConfiguration().getPolicy() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getPolicy())) { InputStream s = ResourceHelper.resolveMandatoryResourceAsInputStream(this.getCamelContext(), getConfiguration().getPolicy()); String policy = IOUtils.toString(s, Charset.defaultCharset()); attributes.put(QueueAttributeName.POLICY, policy); } - if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getReceiveMessageWaitTimeSeconds())) { attributes.put(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds())); } - if (getConfiguration().getDelaySeconds() != null && getConfiguration().isDelayQueue()) { + if (ObjectHelper.isNotEmpty(getConfiguration().getDelaySeconds()) && getConfiguration().isDelayQueue()) { attributes.put(QueueAttributeName.DELAY_SECONDS, String.valueOf(getConfiguration().getDelaySeconds())); } - if (getConfiguration().getRedrivePolicy() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getRedrivePolicy())) { attributes.put(QueueAttributeName.REDRIVE_POLICY, getConfiguration().getRedrivePolicy()); } if (getConfiguration().isServerSideEncryptionEnabled()) { - if (getConfiguration().getKmsMasterKeyId() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getKmsMasterKeyId())) { attributes.put(QueueAttributeName.KMS_MASTER_KEY_ID, getConfiguration().getKmsMasterKeyId()); } - if (getConfiguration().getKmsDataKeyReusePeriodSeconds() != null) { + if (ObjectHelper.isNotEmpty(getConfiguration().getKmsDataKeyReusePeriodSeconds())) { attributes.put(QueueAttributeName.KMS_DATA_KEY_REUSE_PERIOD_SECONDS, String.valueOf(getConfiguration().getKmsDataKeyReusePeriodSeconds())); } @@ -357,7 +357,7 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS @Override public void doStop() throws Exception { if (ObjectHelper.isEmpty(configuration.getAmazonSQSClient())) { - if (client != null) { + if (ObjectHelper.isNotEmpty(client)) { client.close(); } } @@ -427,7 +427,7 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS @Override public Map<String, String> getServiceMetadata() { HashMap<String, String> metadata = new HashMap<>(); - if (configuration.getQueueName() != null) { + if (ObjectHelper.isNotEmpty(configuration.getQueueName())) { metadata.put("queueName", configuration.getQueueName()); } return metadata; diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java index 34c0f01e0144..e58ad11428b2 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java @@ -136,9 +136,9 @@ public class Sqs2Producer extends DefaultProducer { Message message = getMessageForResponse(exchange); message.setBody(result); message.setHeader(Sqs2Constants.FAILED_MESSAGE_COUNT, - result.failed() != null ? result.failed().size() : 0); + ObjectHelper.isNotEmpty(result.failed()) ? result.failed().size() : 0); message.setHeader(Sqs2Constants.SUCCESSFUL_MESSAGE_COUNT, - result.successful() != null ? result.successful().size() : 0); + ObjectHelper.isNotEmpty(result.successful()) ? result.successful().size() : 0); } else if (exchange.getIn().getBody() instanceof String) { String c = exchange.getIn().getBody(String.class); String[] elements = c.split(getConfiguration().getBatchSeparator()); @@ -156,18 +156,18 @@ public class Sqs2Producer extends DefaultProducer { Message message = getMessageForResponse(exchange); message.setBody(result); message.setHeader(Sqs2Constants.FAILED_MESSAGE_COUNT, - result.failed() != null ? result.failed().size() : 0); + ObjectHelper.isNotEmpty(result.failed()) ? result.failed().size() : 0); message.setHeader(Sqs2Constants.SUCCESSFUL_MESSAGE_COUNT, - result.successful() != null ? result.successful().size() : 0); + ObjectHelper.isNotEmpty(result.successful()) ? result.successful().size() : 0); } else { SendMessageBatchRequest req = exchange.getIn().getBody(SendMessageBatchRequest.class); SendMessageBatchResponse result = amazonSQS.sendMessageBatch(req); Message message = getMessageForResponse(exchange); message.setBody(result); message.setHeader(Sqs2Constants.FAILED_MESSAGE_COUNT, - result.failed() != null ? result.failed().size() : 0); + ObjectHelper.isNotEmpty(result.failed()) ? result.failed().size() : 0); message.setHeader(Sqs2Constants.SUCCESSFUL_MESSAGE_COUNT, - result.successful() != null ? result.successful().size() : 0); + ObjectHelper.isNotEmpty(result.successful()) ? result.successful().size() : 0); } } @@ -187,22 +187,22 @@ public class Sqs2Producer extends DefaultProducer { private void listQueues(SqsClient amazonSQS, Exchange exchange) { ListQueuesRequest.Builder request = ListQueuesRequest.builder(); String prefix = exchange.getIn().getHeader(Sqs2Constants.SQS_QUEUE_PREFIX, String.class); - if (prefix != null) { + if (ObjectHelper.isNotEmpty(prefix)) { request.queueNamePrefix(prefix); } String nextToken = exchange.getIn().getHeader(Sqs2Constants.NEXT_TOKEN, String.class); - if (nextToken != null) { + if (ObjectHelper.isNotEmpty(nextToken)) { request.nextToken(nextToken); } Integer maxResults = exchange.getIn().getHeader(Sqs2Constants.MAX_RESULTS, Integer.class); - if (maxResults != null) { + if (ObjectHelper.isNotEmpty(maxResults)) { request.maxResults(maxResults); } ListQueuesResponse result = amazonSQS.listQueues(request.build()); Message message = getMessageForResponse(exchange); message.setBody(result); message.setHeader(Sqs2Constants.NEXT_TOKEN, result.nextToken()); - message.setHeader(Sqs2Constants.IS_TRUNCATED, result.nextToken() != null); + message.setHeader(Sqs2Constants.IS_TRUNCATED, ObjectHelper.isNotEmpty(result.nextToken())); } private void purgeQueue(SqsClient amazonSQS, Exchange exchange) { @@ -258,7 +258,7 @@ public class Sqs2Producer extends DefaultProducer { private void addDelay(SendMessageRequest.Builder request, Exchange exchange) { Integer headerValue = exchange.getIn().getHeader(Sqs2Constants.DELAY_HEADER, Integer.class); Integer delayValue; - if (headerValue == null) { + if (ObjectHelper.isEmpty(headerValue)) { LOG.trace("Using the config delay"); delayValue = getEndpoint().getConfiguration().getDelaySeconds(); } else { @@ -266,7 +266,7 @@ public class Sqs2Producer extends DefaultProducer { delayValue = headerValue; } LOG.trace("found delay: {}", delayValue); - if (delayValue != null) { + if (ObjectHelper.isNotEmpty(delayValue)) { request.delaySeconds(delayValue); } } @@ -274,7 +274,7 @@ public class Sqs2Producer extends DefaultProducer { private void addDelay(SendMessageBatchRequestEntry.Builder request, Exchange exchange) { Integer headerValue = exchange.getIn().getHeader(Sqs2Constants.DELAY_HEADER, Integer.class); Integer delayValue; - if (headerValue == null) { + if (ObjectHelper.isEmpty(headerValue)) { LOG.trace("Using the config delay"); delayValue = getEndpoint().getConfiguration().getDelaySeconds(); } else { @@ -282,7 +282,7 @@ public class Sqs2Producer extends DefaultProducer { delayValue = headerValue; } LOG.trace("found delay: {}", delayValue); - if (delayValue != null) { + if (ObjectHelper.isNotEmpty(delayValue)) { request.delaySeconds(delayValue); } } @@ -306,7 +306,7 @@ public class Sqs2Producer extends DefaultProducer { @Override public String toString() { - if (sqsProducerToString == null) { + if (ObjectHelper.isEmpty(sqsProducerToString)) { sqsProducerToString = "SqsProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]"; } return sqsProducerToString; @@ -322,7 +322,7 @@ public class Sqs2Producer extends DefaultProducer { // We are going to put the first MAX_ATTRIBUTES headers, because this is the maximum Attributes an SQS Message could accept if (result.size() < MAX_ATTRIBUTES) { MessageAttributeValue mav = Sqs2MessageHelper.toMessageAttributeValue(entry.getValue()); - if (mav != null) { + if (ObjectHelper.isNotEmpty(mav)) { result.put(entry.getKey(), mav); } } else { @@ -349,7 +349,7 @@ public class Sqs2Producer extends DefaultProducer { private Sqs2Operations determineOperation(Exchange exchange) { Sqs2Operations operation = exchange.getIn().getHeader(Sqs2Constants.SQS_OPERATION, Sqs2Operations.class); - if (operation == null) { + if (ObjectHelper.isEmpty(operation)) { operation = getConfiguration().getOperation(); } return operation;
