This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 53eb0e5f458d2d107f0c125c705ef46211d8fd9b Author: Andrea Cosentino <[email protected]> AuthorDate: Wed Jan 28 10:28:58 2026 +0100 Camel-AWS components: Use ObjectHelper for null checks - Kinesis Signed-off-by: Andrea Cosentino <[email protected]> --- .../component/aws2/kinesis/Kinesis2Component.java | 3 ++- .../component/aws2/kinesis/Kinesis2Consumer.java | 20 ++++++++++---------- .../component/aws2/kinesis/Kinesis2Endpoint.java | 9 ++++----- .../component/aws2/kinesis/Kinesis2Producer.java | 4 ++-- .../component/aws2/kinesis/KinesisConnection.java | 14 +++++++------- 5 files changed, 25 insertions(+), 25 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java index 2fa390acc201..aef81b12aba6 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java @@ -24,6 +24,7 @@ import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.HealthCheckComponent; import org.apache.camel.util.IOHelper; +import org.apache.camel.util.ObjectHelper; @Component("aws2-kinesis") public class Kinesis2Component extends HealthCheckComponent { @@ -44,7 +45,7 @@ public class Kinesis2Component extends HealthCheckComponent { @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { Kinesis2Configuration configuration - = this.configuration != null ? this.configuration.copy() : new Kinesis2Configuration(); + = ObjectHelper.isNotEmpty(this.configuration) ? this.configuration.copy() : new Kinesis2Configuration(); configuration.setStreamName(remaining); Kinesis2Endpoint endpoint = new Kinesis2Endpoint(uri, configuration, this); setProperties(endpoint, parameters); diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index 67cd2cca8908..59dd0817e348 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -162,7 +162,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R throw new RuntimeException(e); } - if (shardIterator == null) { + if (ObjectHelper.isEmpty(shardIterator)) { // Unable to get an interator so shard must be closed processedExchangeCount.set(0); return; @@ -241,7 +241,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R var shardId = shard.shardId(); - if (currentShardIterators.get(shardId) == null) { + if (ObjectHelper.isEmpty(currentShardIterators.get(shardId))) { if (currentShardIterators.containsKey(shardId)) { // There was previously a shardIterator but shard is now closed handleClosedShard(shardId); @@ -312,12 +312,12 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R } private void resume(String shardId, GetShardIteratorRequest.Builder req) { - if (resumeStrategy == null) { + if (ObjectHelper.isEmpty(resumeStrategy)) { return; } ResumeActionAware adapter = resumeStrategy.getAdapter(ResumeActionAware.class); - if (adapter == null) { + if (ObjectHelper.isEmpty(adapter)) { LOG.warn("There is a resume strategy setup, but no adapter configured or the type is incorrect"); return; @@ -332,7 +332,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R KinesisResumeAction action = getEndpoint().getCamelContext().getRegistry().lookupByNameAndType(Kinesis2Constants.RESUME_ACTION, KinesisResumeAction.class); - if (action == null) { + if (ObjectHelper.isEmpty(action)) { action = new KinesisResumeAction(req); } else { action.setBuilder(req); @@ -359,7 +359,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, dataRecord.partitionKey()); exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER, dataRecord.sequenceNumber()); exchange.getIn().setHeader(Kinesis2Constants.SHARD_ID, shard.shardId()); - if (dataRecord.approximateArrivalTimestamp() != null) { + if (ObjectHelper.isNotEmpty(dataRecord.approximateArrivalTimestamp())) { long ts = dataRecord.approximateArrivalTimestamp().getEpochSecond() * 1000; exchange.getIn().setHeader(Kinesis2Constants.MESSAGE_TIMESTAMP, ts); } @@ -388,7 +388,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R } private Instant parseMessageTimestamp(String messageTimestamp) { - if (messageTimestamp == null) { + if (ObjectHelper.isEmpty(messageTimestamp)) { throw new IllegalArgumentException("Timestamp can't be null"); } // Milliseconds format @@ -423,14 +423,14 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R this.shardMonitorExecutor.scheduleAtFixedRate(new ShardMonitor(), 0, getConfiguration().getShardMonitorInterval(), TimeUnit.MILLISECONDS); - if (resumeStrategy != null) { + if (ObjectHelper.isNotEmpty(resumeStrategy)) { resumeStrategy.loadCache(); } } @Override protected void doStop() throws Exception { - if (this.shardMonitorExecutor != null) { + if (ObjectHelper.isNotEmpty(this.shardMonitorExecutor)) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(this.shardMonitorExecutor); this.shardMonitorExecutor = null; } @@ -455,7 +455,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R public void run() { try { List<Shard> latestShardList = getShardList(connection); - if (latestShardList != null) { + if (ObjectHelper.isNotEmpty(latestShardList)) { setCurrentShardList(latestShardList); } } catch (Exception e) { diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java index ca493afb5534..7bef9ecfb57e 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java @@ -17,7 +17,6 @@ package org.apache.camel.component.aws2.kinesis; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ExecutorService; import org.apache.camel.Category; @@ -64,7 +63,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint implements EndpointS } if (configuration.isAsyncClient() && - Objects.isNull(configuration.getAmazonKinesisAsyncClient())) { + ObjectHelper.isEmpty(configuration.getAmazonKinesisAsyncClient())) { kinesisAsyncClient = kinesisConnection.getAsyncClient(this); } else { kinesisClient = kinesisConnection.getClient(this); @@ -81,9 +80,9 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint implements EndpointS @Override public void doStop() throws Exception { if (ObjectHelper.isEmpty(configuration.getAmazonKinesisClient())) { - if (kinesisClient != null) { + if (ObjectHelper.isNotEmpty(kinesisClient)) { kinesisClient.close(); - } else if (Objects.nonNull(kinesisAsyncClient)) { + } else if (ObjectHelper.isNotEmpty(kinesisAsyncClient)) { kinesisAsyncClient.close(); } } @@ -151,7 +150,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint implements EndpointS @Override public Map<String, String> getServiceMetadata() { - if (configuration.getStreamName() != null) { + if (ObjectHelper.isNotEmpty(configuration.getStreamName())) { return Map.of("stream", configuration.getStreamName()); } return null; diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java index 45594f657a56..54a8f884cadc 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java @@ -151,14 +151,14 @@ public class Kinesis2Producer extends DefaultProducer { putRecordRequest.streamName(getEndpoint().getConfiguration().getStreamName()); ensurePartitionKeyNotNull(partitionKey); putRecordRequest.partitionKey(partitionKey.toString()); - if (sequenceNumber != null) { + if (ObjectHelper.isNotEmpty(sequenceNumber)) { putRecordRequest.sequenceNumberForOrdering(sequenceNumber.toString()); } return putRecordRequest.build(); } private void ensurePartitionKeyNotNull(Object partitionKey) { - if (partitionKey == null) { + if (ObjectHelper.isEmpty(partitionKey)) { throw new IllegalArgumentException("Partition key must be specified"); } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java index b02cab0b8b7e..b9e888f3728e 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java @@ -18,11 +18,11 @@ package org.apache.camel.component.aws2.kinesis; import java.io.Closeable; import java.io.IOException; -import java.util.Objects; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory; +import org.apache.camel.util.ObjectHelper; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisClient; @@ -38,8 +38,8 @@ public class KinesisConnection implements Closeable { public KinesisClient getClient(final Kinesis2Endpoint endpoint) { lock.lock(); try { - if (Objects.isNull(kinesisClient)) { - kinesisClient = endpoint.getConfiguration().getAmazonKinesisClient() != null + if (ObjectHelper.isEmpty(kinesisClient)) { + kinesisClient = ObjectHelper.isNotEmpty(endpoint.getConfiguration().getAmazonKinesisClient()) ? endpoint.getConfiguration().getAmazonKinesisClient() : KinesisClientFactory.getKinesisClient(endpoint.getConfiguration()); } @@ -52,8 +52,8 @@ public class KinesisConnection implements Closeable { public KinesisAsyncClient getAsyncClient(final Kinesis2Endpoint endpoint) { lock.lock(); try { - if (Objects.isNull(kinesisAsyncClient)) { - kinesisAsyncClient = endpoint.getConfiguration().getAmazonKinesisAsyncClient() != null + if (ObjectHelper.isEmpty(kinesisAsyncClient)) { + kinesisAsyncClient = ObjectHelper.isNotEmpty(endpoint.getConfiguration().getAmazonKinesisAsyncClient()) ? endpoint.getConfiguration().getAmazonKinesisAsyncClient() : KinesisClientFactory.getKinesisAsyncClient(endpoint.getConfiguration()); } @@ -73,10 +73,10 @@ public class KinesisConnection implements Closeable { @Override public void close() throws IOException { - if (kinesisClient != null) { + if (ObjectHelper.isNotEmpty(kinesisClient)) { kinesisClient.close(); } - if (kinesisAsyncClient != null) { + if (ObjectHelper.isNotEmpty(kinesisAsyncClient)) { kinesisAsyncClient.close(); } }
