This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit fb6ed629643ad4afbe960dcbb2184f0e13fd99e5 Author: Andrea Cosentino <[email protected]> AuthorDate: Fri Apr 17 08:32:52 2020 +0200 CAMEL-14618 - Camel-aws-s3: Add an option to consumer to be able to move the consumed files to another bucket --- .../aws2/s3/AWS2S3ComponentConfigurer.java | 10 +++ .../aws2/s3/AWS2S3EndpointConfigurer.java | 10 +++ .../apache/camel/component/aws2/s3/aws2-s3.json | 4 ++ .../src/main/docs/aws2-s3-component.adoc | 8 ++- .../component/aws2/s3/AWS2S3Configuration.java | 28 ++++++++ .../camel/component/aws2/s3/AWS2S3Consumer.java | 16 +++++ .../s3/integration/S3ConsumerIntegrationTest.java | 78 ++++++++++++++++++++++ .../dsl/Aws2S3ComponentBuilderFactory.java | 29 ++++++++ .../endpoint/dsl/AWS2S3EndpointBuilderFactory.java | 42 ++++++++++++ 9 files changed, 223 insertions(+), 2 deletions(-) diff --git a/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java b/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java index a52e6f2..a4c64ae 100644 --- a/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java +++ b/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java @@ -46,6 +46,8 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme case "deleteafterwrite": case "deleteAfterWrite": getOrCreateConfiguration(target).setDeleteAfterWrite(property(camelContext, boolean.class, value)); return true; case "delimiter": getOrCreateConfiguration(target).setDelimiter(property(camelContext, java.lang.String.class, value)); return true; + case "destinationbucket": + case "destinationBucket": getOrCreateConfiguration(target).setDestinationBucket(property(camelContext, java.lang.String.class, value)); return true; case "filename": case "fileName": getOrCreateConfiguration(target).setFileName(property(camelContext, java.lang.String.class, value)); return true; case "includebody": @@ -56,6 +58,8 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme case "keyName": getOrCreateConfiguration(target).setKeyName(property(camelContext, java.lang.String.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; + case "moveafterread": + case "moveAfterRead": getOrCreateConfiguration(target).setMoveAfterRead(property(camelContext, boolean.class, value)); return true; case "multipartupload": case "multiPartUpload": getOrCreateConfiguration(target).setMultiPartUpload(property(camelContext, boolean.class, value)); return true; case "operation": getOrCreateConfiguration(target).setOperation(property(camelContext, org.apache.camel.component.aws2.s3.AWS2S3Operations.class, value)); return true; @@ -102,11 +106,13 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme answer.put("deleteAfterRead", boolean.class); answer.put("deleteAfterWrite", boolean.class); answer.put("delimiter", java.lang.String.class); + answer.put("destinationBucket", java.lang.String.class); answer.put("fileName", java.lang.String.class); answer.put("includeBody", boolean.class); answer.put("includeFolders", boolean.class); answer.put("keyName", java.lang.String.class); answer.put("lazyStartProducer", boolean.class); + answer.put("moveAfterRead", boolean.class); answer.put("multiPartUpload", boolean.class); answer.put("operation", org.apache.camel.component.aws2.s3.AWS2S3Operations.class); answer.put("overrideEndpoint", boolean.class); @@ -150,6 +156,8 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme case "deleteafterwrite": case "deleteAfterWrite": return getOrCreateConfiguration(target).isDeleteAfterWrite(); case "delimiter": return getOrCreateConfiguration(target).getDelimiter(); + case "destinationbucket": + case "destinationBucket": return getOrCreateConfiguration(target).getDestinationBucket(); case "filename": case "fileName": return getOrCreateConfiguration(target).getFileName(); case "includebody": @@ -160,6 +168,8 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme case "keyName": return getOrCreateConfiguration(target).getKeyName(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); + case "moveafterread": + case "moveAfterRead": return getOrCreateConfiguration(target).isMoveAfterRead(); case "multipartupload": case "multiPartUpload": return getOrCreateConfiguration(target).isMultiPartUpload(); case "operation": return getOrCreateConfiguration(target).getOperation(); diff --git a/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java b/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java index 255e552..6d825e99 100644 --- a/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java +++ b/components/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java @@ -45,6 +45,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen case "deleteafterwrite": case "deleteAfterWrite": target.getConfiguration().setDeleteAfterWrite(property(camelContext, boolean.class, value)); return true; case "delimiter": target.getConfiguration().setDelimiter(property(camelContext, java.lang.String.class, value)); return true; + case "destinationbucket": + case "destinationBucket": target.getConfiguration().setDestinationBucket(property(camelContext, java.lang.String.class, value)); return true; case "exceptionhandler": case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true; case "exchangepattern": @@ -66,6 +68,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen case "maxConnections": target.setMaxConnections(property(camelContext, int.class, value)); return true; case "maxmessagesperpoll": case "maxMessagesPerPoll": target.setMaxMessagesPerPoll(property(camelContext, int.class, value)); return true; + case "moveafterread": + case "moveAfterRead": target.getConfiguration().setMoveAfterRead(property(camelContext, boolean.class, value)); return true; case "multipartupload": case "multiPartUpload": target.getConfiguration().setMultiPartUpload(property(camelContext, boolean.class, value)); return true; case "operation": target.getConfiguration().setOperation(property(camelContext, org.apache.camel.component.aws2.s3.AWS2S3Operations.class, value)); return true; @@ -135,6 +139,7 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen answer.put("deleteAfterRead", boolean.class); answer.put("deleteAfterWrite", boolean.class); answer.put("delimiter", java.lang.String.class); + answer.put("destinationBucket", java.lang.String.class); answer.put("exceptionHandler", org.apache.camel.spi.ExceptionHandler.class); answer.put("exchangePattern", org.apache.camel.ExchangePattern.class); answer.put("fileName", java.lang.String.class); @@ -146,6 +151,7 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen answer.put("lazyStartProducer", boolean.class); answer.put("maxConnections", int.class); answer.put("maxMessagesPerPoll", int.class); + answer.put("moveAfterRead", boolean.class); answer.put("multiPartUpload", boolean.class); answer.put("operation", org.apache.camel.component.aws2.s3.AWS2S3Operations.class); answer.put("overrideEndpoint", boolean.class); @@ -206,6 +212,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen case "deleteafterwrite": case "deleteAfterWrite": return target.getConfiguration().isDeleteAfterWrite(); case "delimiter": return target.getConfiguration().getDelimiter(); + case "destinationbucket": + case "destinationBucket": return target.getConfiguration().getDestinationBucket(); case "exceptionhandler": case "exceptionHandler": return target.getExceptionHandler(); case "exchangepattern": @@ -227,6 +235,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen case "maxConnections": return target.getMaxConnections(); case "maxmessagesperpoll": case "maxMessagesPerPoll": return target.getMaxMessagesPerPoll(); + case "moveafterread": + case "moveAfterRead": return target.getConfiguration().isMoveAfterRead(); case "multipartupload": case "multiPartUpload": return target.getConfiguration().isMultiPartUpload(); case "operation": return target.getConfiguration().getOperation(); diff --git a/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json b/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json index 2e5330d..13c300d 100644 --- a/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json +++ b/components/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json @@ -36,9 +36,11 @@ "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by [...] "deleteAfterRead": { "kind": "property", "displayName": "Delete After Read", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committe [...] "delimiter": { "kind": "property", "displayName": "Delimiter", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "The delimiter which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are interested in." }, + "destinationBucket": { "kind": "property", "displayName": "Destination Bucket", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Define the destination bucket where an object must be moved when moveAfterRead is set to true." }, "fileName": { "kind": "property", "displayName": "File Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "To get the object from the bucket with the given file name" }, "includeBody": { "kind": "property", "displayName": "Include Body", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If it is true, the exchange body will be set to a stream to the contents of the file. If false, the headers will be set [...] "includeFolders": { "kind": "property", "displayName": "Include Folders", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If it is true, the folders\/directories will be consumed. If it is false, they will be ignored, and Exchanges will [...] + "moveAfterRead": { "kind": "property", "displayName": "Move After Read", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Move objects from S3 bucket to a different bucket after they have been retrieved. The copy bucket operation is only [...] "prefix": { "kind": "property", "displayName": "Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "The prefix which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are interested in." }, "autocloseBody": { "kind": "property", "displayName": "Autoclose Body", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If this option is true and includeBody is true, then the S3Object.close() method will be called [...] "deleteAfterWrite": { "kind": "property", "displayName": "Delete After Write", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Delete file object after the S3 file has been uploaded" }, @@ -70,11 +72,13 @@ "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled b [...] "deleteAfterRead": { "kind": "parameter", "displayName": "Delete After Read", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committ [...] "delimiter": { "kind": "parameter", "displayName": "Delimiter", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "The delimiter which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are interested in." }, + "destinationBucket": { "kind": "parameter", "displayName": "Destination Bucket", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Define the destination bucket where an object must be moved when moveAfterRead is set to true." }, "fileName": { "kind": "parameter", "displayName": "File Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "To get the object from the bucket with the given file name" }, "includeBody": { "kind": "parameter", "displayName": "Include Body", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If it is true, the exchange body will be set to a stream to the contents of the file. If false, the headers will be set [...] "includeFolders": { "kind": "parameter", "displayName": "Include Folders", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If it is true, the folders\/directories will be consumed. If it is false, they will be ignored, and Exchanges wil [...] "maxConnections": { "kind": "parameter", "displayName": "Max Connections", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "60", "description": "Set the maxConnections parameter in the S3 client configuration" }, "maxMessagesPerPoll": { "kind": "parameter", "displayName": "Max Messages Per Poll", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "10", "description": "Gets the maximum number of messages as a limit to poll at each polling. Gets the maximum number of messages as a limit to poll at each polling. The default value is 10. Use 0 or a negative number to set it as unlimited." }, + "moveAfterRead": { "kind": "parameter", "displayName": "Move After Read", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Move objects from S3 bucket to a different bucket after they have been retrieved. The copy bucket operation is only [...] "prefix": { "kind": "parameter", "displayName": "Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "The prefix which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are interested in." }, "sendEmptyMessageWhenIdle": { "kind": "parameter", "displayName": "Send Empty Message When Idle", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead." }, "autocloseBody": { "kind": "parameter", "displayName": "Autoclose Body", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If this option is true and includeBody is true, then the S3Object.close() method will be called [...] diff --git a/components/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc b/components/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc index 677e49f..2bacfc9 100644 --- a/components/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc +++ b/components/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc @@ -47,7 +47,7 @@ from("aws2-s3://helloBucket?accessKey=yourAccessKey&secretKey=yourSecretKey&pref // component options: START -The AWS2 S3 Storage Service component supports 32 options, which are listed below. +The AWS2 S3 Storage Service component supports 34 options, which are listed below. @@ -69,9 +69,11 @@ The AWS2 S3 Storage Service component supports 32 options, which are listed belo | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *deleteAfterRead* (consumer) | Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committed. If a rollback occurs, the object is not deleted. If this option is false, then the same objects will be retrieve over and over again on the polls. Therefore you need to use the Idempotent Consumer EIP in the route to filter out duplicates. You can filter using the AWS2S3Constants#BUCKET_NAME and AWS2S3Constants#KEY headers, or only the AWS2S3C [...] | *delimiter* (consumer) | The delimiter which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are interested in. | | String +| *destinationBucket* (consumer) | Define the destination bucket where an object must be moved when moveAfterRead is set to true. | | String | *fileName* (consumer) | To get the object from the bucket with the given file name | | String | *includeBody* (consumer) | If it is true, the exchange body will be set to a stream to the contents of the file. If false, the headers will be set with the S3 object metadata, but the body will be null. This option is strongly related to autocloseBody option. In case of setting includeBody to true and autocloseBody to false, it will be up to the caller to close the S3Object stream. Setting autocloseBody to true, will close the S3Object stream automatically. | true | boolean | *includeFolders* (consumer) | If it is true, the folders/directories will be consumed. If it is false, they will be ignored, and Exchanges will not be created for those | true | boolean +| *moveAfterRead* (consumer) | Move objects from S3 bucket to a different bucket after they have been retrieved. The copy bucket operation is only performed if the Exchange is committed. If a rollback occurs, the object is not moved. | false | boolean | *prefix* (consumer) | The prefix which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are interested in. | | String | *autocloseBody* (consumer) | If this option is true and includeBody is true, then the S3Object.close() method will be called on exchange completion. This option is strongly related to includeBody option. In case of setting includeBody to true and autocloseBody to false, it will be up to the caller to close the S3Object stream. Setting autocloseBody to true, will close the S3Object stream automatically. | true | boolean | *deleteAfterWrite* (producer) | Delete file object after the S3 file has been uploaded | false | boolean @@ -118,7 +120,7 @@ with the following path and query parameters: |=== -=== Query Parameters (52 parameters): +=== Query Parameters (54 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -138,11 +140,13 @@ with the following path and query parameters: | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *deleteAfterRead* (consumer) | Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committed. If a rollback occurs, the object is not deleted. If this option is false, then the same objects will be retrieve over and over again on the polls. Therefore you need to use the Idempotent Consumer EIP in the route to filter out duplicates. You can filter using the AWS2S3Constants#BUCKET_NAME and AWS2S3Constants#KEY headers, or only the AWS2S3C [...] | *delimiter* (consumer) | The delimiter which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are interested in. | | String +| *destinationBucket* (consumer) | Define the destination bucket where an object must be moved when moveAfterRead is set to true. | | String | *fileName* (consumer) | To get the object from the bucket with the given file name | | String | *includeBody* (consumer) | If it is true, the exchange body will be set to a stream to the contents of the file. If false, the headers will be set with the S3 object metadata, but the body will be null. This option is strongly related to autocloseBody option. In case of setting includeBody to true and autocloseBody to false, it will be up to the caller to close the S3Object stream. Setting autocloseBody to true, will close the S3Object stream automatically. | true | boolean | *includeFolders* (consumer) | If it is true, the folders/directories will be consumed. If it is false, they will be ignored, and Exchanges will not be created for those | true | boolean | *maxConnections* (consumer) | Set the maxConnections parameter in the S3 client configuration | 60 | int | *maxMessagesPerPoll* (consumer) | Gets the maximum number of messages as a limit to poll at each polling. Gets the maximum number of messages as a limit to poll at each polling. The default value is 10. Use 0 or a negative number to set it as unlimited. | 10 | int +| *moveAfterRead* (consumer) | Move objects from S3 bucket to a different bucket after they have been retrieved. The copy bucket operation is only performed if the Exchange is committed. If a rollback occurs, the object is not moved. | false | boolean | *prefix* (consumer) | The prefix which is used in the com.amazonaws.services.s3.model.ListObjectsRequest to only consume objects we are interested in. | | String | *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. | false | boolean | *autocloseBody* (consumer) | If this option is true and includeBody is true, then the S3Object.close() method will be called on exchange completion. This option is strongly related to includeBody option. In case of setting includeBody to true and autocloseBody to false, it will be up to the caller to close the S3Object stream. Setting autocloseBody to true, will close the S3Object stream automatically. | true | boolean diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java index 17055cc..99cb2b3 100644 --- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java +++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java @@ -44,6 +44,10 @@ public class AWS2S3Configuration implements Cloneable { private String region; @UriParam(label = "consumer", defaultValue = "true") private boolean deleteAfterRead = true; + @UriParam(label = "consumer") + private boolean moveAfterRead; + @UriParam(label = "consumer") + private String destinationBucket; @UriParam(label = "producer") private boolean deleteAfterWrite; @UriParam(label = "producer") @@ -252,6 +256,30 @@ public class AWS2S3Configuration implements Cloneable { this.deleteAfterRead = deleteAfterRead; } + public boolean isMoveAfterRead() { + return moveAfterRead; + } + + /** + * Move objects from S3 bucket to a different bucket after they have been retrieved. + * The copy bucket operation is only performed if the Exchange is committed. If a rollback occurs, the object + * is not moved. + */ + public void setMoveAfterRead(boolean moveAfterRead) { + this.moveAfterRead = moveAfterRead; + } + + public String getDestinationBucket() { + return destinationBucket; + } + + /** + * Define the destination bucket where an object must be moved when moveAfterRead is set to true. + */ + public void setDestinationBucket(String destinationBucket) { + this.destinationBucket = destinationBucket; + } + public boolean isDeleteAfterWrite() { return deleteAfterWrite; } diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java index 0dddb91..0fa3fda 100644 --- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java +++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java @@ -39,6 +39,7 @@ import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -223,6 +224,21 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { getAmazonS3Client().deleteObject(DeleteObjectRequest.builder().bucket(getConfiguration().getBucketName()).key(key).build()); LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key); + } else if (getConfiguration().isMoveAfterRead()) { + String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class); + String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class); + + LOG.trace("Moving object from bucket {} with key {} to bucket {}...", bucketName, key, getConfiguration().getDestinationBucket()); + + getAmazonS3Client().copyObject(CopyObjectRequest.builder().destinationKey(key).destinationBucket(getConfiguration().getDestinationBucket()).copySource(bucketName + "/" + key).build()); + + LOG.trace("Moved object from bucket {} with key {} to bucket {}...", bucketName, key, getConfiguration().getDestinationBucket()); + + LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key); + + getAmazonS3Client().deleteObject(DeleteObjectRequest.builder().bucket(getConfiguration().getBucketName()).key(key).build()); + + LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key); } } catch (AwsServiceException e) { getExceptionHandler().handleException("Error occurred during deleting object. This exception is ignored.", exchange, e); diff --git a/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java b/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java new file mode 100644 index 0000000..7a17893 --- /dev/null +++ b/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.s3.integration; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws2.s3.AWS2S3Constants; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +@Disabled("Must be manually tested. Provide your own accessKey and secretKey!") +public class S3ConsumerIntegrationTest extends CamelTestSupport { + + @BindToRegistry("amazonS3Client") + S3Client client = S3Client.builder().credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("xxx", "yyy"))).region(Region.EU_WEST_1).build(); + + @EndpointInject + private ProducerTemplate template; + + @EndpointInject("mock:result") + private MockEndpoint result; + + @Test + public void sendIn() throws Exception { + result.expectedMessageCount(1); + + template.send("direct:putObject", new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(AWS2S3Constants.KEY, "test.txt"); + exchange.getIn().setBody("Test"); + } + }); + + Thread.sleep(5000); + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + String awsEndpoint = "aws2-s3://mycamel?autoCreateBucket=false"; + + from("direct:putObject").startupOrder(1).to(awsEndpoint).to("mock:result"); + + from("aws2-s3://mycamel?moveAfterRead=true&deleteAfterRead=false&destinationBucket=camel-kafka-connector").startupOrder(2).log("${header.key}"); + + } + }; + } +} diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java index d95ecd2..6543508 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java @@ -255,6 +255,19 @@ public interface Aws2S3ComponentBuilderFactory { return this; } /** + * Define the destination bucket where an object must be moved when + * moveAfterRead is set to true. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: consumer + */ + default Aws2S3ComponentBuilder destinationBucket( + java.lang.String destinationBucket) { + doSetProperty("destinationBucket", destinationBucket); + return this; + } + /** * To get the object from the bucket with the given file name. * * The option is a: <code>java.lang.String</code> type. @@ -298,6 +311,20 @@ public interface Aws2S3ComponentBuilderFactory { return this; } /** + * Move objects from S3 bucket to a different bucket after they have + * been retrieved. The copy bucket operation is only performed if the + * Exchange is committed. If a rollback occurs, the object is not moved. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: consumer + */ + default Aws2S3ComponentBuilder moveAfterRead(boolean moveAfterRead) { + doSetProperty("moveAfterRead", moveAfterRead); + return this; + } + /** * The prefix which is used in the * com.amazonaws.services.s3.model.ListObjectsRequest to only consume * objects we are interested in. @@ -523,9 +550,11 @@ public interface Aws2S3ComponentBuilderFactory { case "bridgeErrorHandler": ((AWS2S3Component) component).setBridgeErrorHandler((boolean) value); return true; case "deleteAfterRead": getOrCreateConfiguration((AWS2S3Component) component).setDeleteAfterRead((boolean) value); return true; case "delimiter": getOrCreateConfiguration((AWS2S3Component) component).setDelimiter((java.lang.String) value); return true; + case "destinationBucket": getOrCreateConfiguration((AWS2S3Component) component).setDestinationBucket((java.lang.String) value); return true; case "fileName": getOrCreateConfiguration((AWS2S3Component) component).setFileName((java.lang.String) value); return true; case "includeBody": getOrCreateConfiguration((AWS2S3Component) component).setIncludeBody((boolean) value); return true; case "includeFolders": getOrCreateConfiguration((AWS2S3Component) component).setIncludeFolders((boolean) value); return true; + case "moveAfterRead": getOrCreateConfiguration((AWS2S3Component) component).setMoveAfterRead((boolean) value); return true; case "prefix": getOrCreateConfiguration((AWS2S3Component) component).setPrefix((java.lang.String) value); return true; case "autocloseBody": getOrCreateConfiguration((AWS2S3Component) component).setAutocloseBody((boolean) value); return true; case "deleteAfterWrite": getOrCreateConfiguration((AWS2S3Component) component).setDeleteAfterWrite((boolean) value); return true; diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java index 865040a..6d205c2 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java @@ -371,6 +371,19 @@ public interface AWS2S3EndpointBuilderFactory { return this; } /** + * Define the destination bucket where an object must be moved when + * moveAfterRead is set to true. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: consumer + */ + default AWS2S3EndpointConsumerBuilder destinationBucket( + String destinationBucket) { + doSetProperty("destinationBucket", destinationBucket); + return this; + } + /** * To get the object from the bucket with the given file name. * * The option is a: <code>java.lang.String</code> type. @@ -505,6 +518,35 @@ public interface AWS2S3EndpointBuilderFactory { return this; } /** + * Move objects from S3 bucket to a different bucket after they have + * been retrieved. The copy bucket operation is only performed if the + * Exchange is committed. If a rollback occurs, the object is not moved. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: consumer + */ + default AWS2S3EndpointConsumerBuilder moveAfterRead( + boolean moveAfterRead) { + doSetProperty("moveAfterRead", moveAfterRead); + return this; + } + /** + * Move objects from S3 bucket to a different bucket after they have + * been retrieved. The copy bucket operation is only performed if the + * Exchange is committed. If a rollback occurs, the object is not moved. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: false + * Group: consumer + */ + default AWS2S3EndpointConsumerBuilder moveAfterRead(String moveAfterRead) { + doSetProperty("moveAfterRead", moveAfterRead); + return this; + } + /** * The prefix which is used in the * com.amazonaws.services.s3.model.ListObjectsRequest to only consume * objects we are interested in.
