This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch 22805-2 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2d0508e381cdc7cd74ec30a8994f389b7ab85208 Author: Andrea Cosentino <[email protected]> AuthorDate: Mon Jan 12 10:58:50 2026 +0100 CAMEL-22836 - Camel-AWS components: Avoid duplicated code and add pagination to producer operation where it makes sense - AWS EC2 Signed-off-by: Andrea Cosentino <[email protected]> --- .../apache/camel/catalog/components/aws2-ec2.json | 5 +- .../apache/camel/component/aws2/ec2/aws2-ec2.json | 5 +- .../camel/component/aws2/ec2/AWS2EC2Constants.java | 11 ++ .../camel/component/aws2/ec2/AWS2EC2Producer.java | 192 ++++++++++++++------- .../dsl/AWS2EC2EndpointBuilderFactory.java | 36 ++++ 5 files changed, 186 insertions(+), 63 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-ec2.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-ec2.json index e0e885315f31..ef6345ca2e9c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-ec2.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-ec2.json @@ -62,7 +62,10 @@ "CamelAwsEC2InstancesClientToken": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Unique, case-sensitive identifier you provide to ensure the idempotency of the request.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#INSTANCES_CLIENT_TOKEN" }, "CamelAwsEC2InstancesPlacement": { "index": 12, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "software.amazon.awssdk.services.ec2.model.Placement", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The placement for the instance.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#INSTANCES_PLACEMENT" }, "CamelAwsEC2InstancesTags": { "index": 13, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Collection<Tag>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "A collection of tags to add or remove from EC2 resources", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#INSTANCES_TAGS" }, - "CamelAwsEC2SubnetId": { "index": 14, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ID of the subnet to launch the instance into.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#SUBNET_ID" } + "CamelAwsEC2SubnetId": { "index": 14, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ID of the subnet to launch the instance into.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#SUBNET_ID" }, + "CamelAwsEC2NextToken": { "index": 15, "kind": "header", "displayName": "", "group": "describeInstances describeInstancesStatus", "label": "describeInstances describeInstancesStatus", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The token for the next set of results.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#NEXT_TOKEN" }, + "CamelAwsEC2MaxResults": { "index": 16, "kind": "header", "displayName": "", "group": "describeInstances describeInstancesStatus", "label": "describeInstances describeInstancesStatus", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The maximum number of results to return.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#MAX_RESULTS" }, + "CamelAwsEC2IsTruncated": { "index": 17, "kind": "header", "displayName": "", "group": "describeInstances describeInstancesStatus", "label": "describeInstances describeInstancesStatus", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the response has more results (is truncated).", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#IS_TRUNCATED" } }, "properties": { "label": { "index": 0, "kind": "path", "displayName": "Label", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ec2.AWS2EC2Configuration", "configurationField": "configuration", "description": "Logical name" }, diff --git a/components/camel-aws/camel-aws2-ec2/src/generated/resources/META-INF/org/apache/camel/component/aws2/ec2/aws2-ec2.json b/components/camel-aws/camel-aws2-ec2/src/generated/resources/META-INF/org/apache/camel/component/aws2/ec2/aws2-ec2.json index e0e885315f31..ef6345ca2e9c 100644 --- a/components/camel-aws/camel-aws2-ec2/src/generated/resources/META-INF/org/apache/camel/component/aws2/ec2/aws2-ec2.json +++ b/components/camel-aws/camel-aws2-ec2/src/generated/resources/META-INF/org/apache/camel/component/aws2/ec2/aws2-ec2.json @@ -62,7 +62,10 @@ "CamelAwsEC2InstancesClientToken": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Unique, case-sensitive identifier you provide to ensure the idempotency of the request.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#INSTANCES_CLIENT_TOKEN" }, "CamelAwsEC2InstancesPlacement": { "index": 12, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "software.amazon.awssdk.services.ec2.model.Placement", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The placement for the instance.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#INSTANCES_PLACEMENT" }, "CamelAwsEC2InstancesTags": { "index": 13, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Collection<Tag>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "A collection of tags to add or remove from EC2 resources", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#INSTANCES_TAGS" }, - "CamelAwsEC2SubnetId": { "index": 14, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ID of the subnet to launch the instance into.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#SUBNET_ID" } + "CamelAwsEC2SubnetId": { "index": 14, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ID of the subnet to launch the instance into.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#SUBNET_ID" }, + "CamelAwsEC2NextToken": { "index": 15, "kind": "header", "displayName": "", "group": "describeInstances describeInstancesStatus", "label": "describeInstances describeInstancesStatus", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The token for the next set of results.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#NEXT_TOKEN" }, + "CamelAwsEC2MaxResults": { "index": 16, "kind": "header", "displayName": "", "group": "describeInstances describeInstancesStatus", "label": "describeInstances describeInstancesStatus", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The maximum number of results to return.", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#MAX_RESULTS" }, + "CamelAwsEC2IsTruncated": { "index": 17, "kind": "header", "displayName": "", "group": "describeInstances describeInstancesStatus", "label": "describeInstances describeInstancesStatus", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the response has more results (is truncated).", "constantName": "org.apache.camel.component.aws2.ec2.AWS2EC2Constants#IS_TRUNCATED" } }, "properties": { "label": { "index": 0, "kind": "path", "displayName": "Label", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ec2.AWS2EC2Configuration", "configurationField": "configuration", "description": "Logical name" }, diff --git a/components/camel-aws/camel-aws2-ec2/src/main/java/org/apache/camel/component/aws2/ec2/AWS2EC2Constants.java b/components/camel-aws/camel-aws2-ec2/src/main/java/org/apache/camel/component/aws2/ec2/AWS2EC2Constants.java index 263ac736ff79..fa4df10adecb 100644 --- a/components/camel-aws/camel-aws2-ec2/src/main/java/org/apache/camel/component/aws2/ec2/AWS2EC2Constants.java +++ b/components/camel-aws/camel-aws2-ec2/src/main/java/org/apache/camel/component/aws2/ec2/AWS2EC2Constants.java @@ -57,4 +57,15 @@ public interface AWS2EC2Constants { String INSTANCES_TAGS = "CamelAwsEC2InstancesTags"; @Metadata(description = "The ID of the subnet to launch the instance into.", javaType = "String") String SUBNET_ID = "CamelAwsEC2SubnetId"; + + // Pagination constants + @Metadata(label = "describeInstances describeInstancesStatus", + description = "The token for the next set of results.", javaType = "String") + String NEXT_TOKEN = "CamelAwsEC2NextToken"; + @Metadata(label = "describeInstances describeInstancesStatus", + description = "The maximum number of results to return.", javaType = "Integer") + String MAX_RESULTS = "CamelAwsEC2MaxResults"; + @Metadata(label = "describeInstances describeInstancesStatus", + description = "Whether the response has more results (is truncated).", javaType = "Boolean") + String IS_TRUNCATED = "CamelAwsEC2IsTruncated"; } diff --git a/components/camel-aws/camel-aws2-ec2/src/main/java/org/apache/camel/component/aws2/ec2/AWS2EC2Producer.java b/components/camel-aws/camel-aws2-ec2/src/main/java/org/apache/camel/component/aws2/ec2/AWS2EC2Producer.java index b84f87b90d1b..7b39b752d3f0 100644 --- a/components/camel-aws/camel-aws2-ec2/src/main/java/org/apache/camel/component/aws2/ec2/AWS2EC2Producer.java +++ b/components/camel-aws/camel-aws2-ec2/src/main/java/org/apache/camel/component/aws2/ec2/AWS2EC2Producer.java @@ -18,6 +18,9 @@ package org.apache.camel.component.aws2.ec2; import java.util.Arrays; import java.util.Collection; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -360,71 +363,62 @@ public class AWS2EC2Producer extends DefaultProducer { @SuppressWarnings("unchecked") private void describeInstances(Ec2Client ec2Client, Exchange exchange) throws InvalidPayloadException { - Collection<String> instanceIds; - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof DescribeInstancesRequest) { - DescribeInstancesResponse result; - try { - result = ec2Client.describeInstances((DescribeInstancesRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("Describe Instances command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - DescribeInstancesRequest.Builder builder = DescribeInstancesRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(AWS2EC2Constants.INSTANCES_IDS))) { - instanceIds = exchange.getIn().getHeader(AWS2EC2Constants.INSTANCES_IDS, Collection.class); - builder.instanceIds(instanceIds); - } - DescribeInstancesResponse result; - try { - result = ec2Client.describeInstances(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Describe Instances command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + DescribeInstancesRequest.class, + ec2Client::describeInstances, + () -> { + DescribeInstancesRequest.Builder builder = DescribeInstancesRequest.builder(); + Collection<String> instanceIds + = getOptionalHeader(exchange, AWS2EC2Constants.INSTANCES_IDS, Collection.class); + if (instanceIds != null) { + builder.instanceIds(instanceIds); + } + String nextToken = getOptionalHeader(exchange, AWS2EC2Constants.NEXT_TOKEN, String.class); + if (nextToken != null) { + builder.nextToken(nextToken); + } + Integer maxResults = getOptionalHeader(exchange, AWS2EC2Constants.MAX_RESULTS, Integer.class); + if (maxResults != null) { + builder.maxResults(maxResults); + } + return ec2Client.describeInstances(builder.build()); + }, + "Describe Instances", + (DescribeInstancesResponse response, Message message) -> { + message.setHeader(AWS2EC2Constants.NEXT_TOKEN, response.nextToken()); + message.setHeader(AWS2EC2Constants.IS_TRUNCATED, response.nextToken() != null); + }); } @SuppressWarnings("unchecked") private void describeInstancesStatus(Ec2Client ec2Client, Exchange exchange) throws InvalidPayloadException { - Collection<String> instanceIds; - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof DescribeInstanceStatusRequest) { - DescribeInstanceStatusResponse result; - try { - result = ec2Client.describeInstanceStatus((DescribeInstanceStatusRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("Describe Instances Status command returned the error code {}", - ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - DescribeInstanceStatusRequest.Builder builder = DescribeInstanceStatusRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(AWS2EC2Constants.INSTANCES_IDS))) { - instanceIds = exchange.getIn().getHeader(AWS2EC2Constants.INSTANCES_IDS, Collection.class); - builder.instanceIds(instanceIds); - } - DescribeInstanceStatusResponse result; - try { - result = ec2Client.describeInstanceStatus(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Describe Instances Status command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + DescribeInstanceStatusRequest.class, + ec2Client::describeInstanceStatus, + () -> { + DescribeInstanceStatusRequest.Builder builder = DescribeInstanceStatusRequest.builder(); + Collection<String> instanceIds + = getOptionalHeader(exchange, AWS2EC2Constants.INSTANCES_IDS, Collection.class); + if (instanceIds != null) { + builder.instanceIds(instanceIds); + } + String nextToken = getOptionalHeader(exchange, AWS2EC2Constants.NEXT_TOKEN, String.class); + if (nextToken != null) { + builder.nextToken(nextToken); + } + Integer maxResults = getOptionalHeader(exchange, AWS2EC2Constants.MAX_RESULTS, Integer.class); + if (maxResults != null) { + builder.maxResults(maxResults); + } + return ec2Client.describeInstanceStatus(builder.build()); + }, + "Describe Instances Status", + (DescribeInstanceStatusResponse response, Message message) -> { + message.setHeader(AWS2EC2Constants.NEXT_TOKEN, response.nextToken()); + message.setHeader(AWS2EC2Constants.IS_TRUNCATED, response.nextToken() != null); + }); } @SuppressWarnings("unchecked") @@ -648,6 +642,82 @@ public class AWS2EC2Producer extends DefaultProducer { return exchange.getMessage(); } + /** + * Executes an EC2 operation with POJO request support. + */ + private <REQ, RES> void executeOperation( + Exchange exchange, + Class<REQ> requestClass, + Function<REQ, RES> pojoExecutor, + Supplier<RES> headerExecutor, + String operationName) + throws InvalidPayloadException { + executeOperation(exchange, requestClass, pojoExecutor, headerExecutor, operationName, null); + } + + /** + * Executes an EC2 operation with POJO request support and optional response post-processing. + */ + private <REQ, RES> void executeOperation( + Exchange exchange, + Class<REQ> requestClass, + Function<REQ, RES> pojoExecutor, + Supplier<RES> headerExecutor, + String operationName, + BiConsumer<RES, Message> responseProcessor) + throws InvalidPayloadException { + + RES result; + if (getConfiguration().isPojoRequest()) { + Object payload = exchange.getIn().getMandatoryBody(); + if (requestClass.isInstance(payload)) { + try { + result = pojoExecutor.apply(requestClass.cast(payload)); + } catch (AwsServiceException ase) { + LOG.trace("{} command returned the error code {}", operationName, ase.awsErrorDetails().errorCode()); + throw ase; + } + LOG.trace("{} request performing", operationName); + } else { + throw new IllegalArgumentException( + String.format("Expected body of type %s but was %s", + requestClass.getName(), + payload != null ? payload.getClass().getName() : "null")); + } + } else { + try { + result = headerExecutor.get(); + } catch (AwsServiceException ase) { + LOG.trace("{} command returned the error code {}", operationName, ase.awsErrorDetails().errorCode()); + throw ase; + } + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + if (responseProcessor != null) { + responseProcessor.accept(result, message); + } + } + + /** + * Gets a required header value or throws an IllegalArgumentException. + */ + @SuppressWarnings("unchecked") + private <T> T getRequiredHeader(Exchange exchange, String headerName, Class<T> headerType, String errorMessage) { + T value = exchange.getIn().getHeader(headerName, headerType); + if (ObjectHelper.isEmpty(value)) { + throw new IllegalArgumentException(errorMessage); + } + return value; + } + + /** + * Gets an optional header value. + */ + private <T> T getOptionalHeader(Exchange exchange, String headerName, Class<T> headerType) { + return exchange.getIn().getHeader(headerName, headerType); + } + @Override protected void doStart() throws Exception { // health-check is optional so discover and resolve diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2EC2EndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2EC2EndpointBuilderFactory.java index 606df57157c7..1513146f371e 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2EC2EndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2EC2EndpointBuilderFactory.java @@ -783,6 +783,42 @@ public interface AWS2EC2EndpointBuilderFactory { public String awsEC2SubnetId() { return "CamelAwsEC2SubnetId"; } + /** + * The token for the next set of results. + * + * The option is a: {@code String} type. + * + * Group: describeInstances describeInstancesStatus + * + * @return the name of the header {@code AwsEC2NextToken}. + */ + public String awsEC2NextToken() { + return "CamelAwsEC2NextToken"; + } + /** + * The maximum number of results to return. + * + * The option is a: {@code Integer} type. + * + * Group: describeInstances describeInstancesStatus + * + * @return the name of the header {@code AwsEC2MaxResults}. + */ + public String awsEC2MaxResults() { + return "CamelAwsEC2MaxResults"; + } + /** + * Whether the response has more results (is truncated). + * + * The option is a: {@code Boolean} type. + * + * Group: describeInstances describeInstancesStatus + * + * @return the name of the header {@code AwsEC2IsTruncated}. + */ + public String awsEC2IsTruncated() { + return "CamelAwsEC2IsTruncated"; + } } static AWS2EC2EndpointBuilder endpointBuilder(String componentName, String path) { class AWS2EC2EndpointBuilderImpl extends AbstractEndpointBuilder implements AWS2EC2EndpointBuilder, AdvancedAWS2EC2EndpointBuilder {
