This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-22805-part-2 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 189b44cdb7686434b0865defc3ea73afb241b637 Author: Andrea Cosentino <[email protected]> AuthorDate: Thu Jan 8 11:43:17 2026 +0100 CAMEL-22825 - Camel-AWS components: Avoid duplicated code and add pagination to producer operation where it makes sense - AWS Athena Signed-off-by: Andrea Cosentino <[email protected]> --- .../camel/catalog/components/aws2-athena.json | 33 ++-- .../camel/component/aws2/athena/aws2-athena.json | 33 ++-- .../component/aws2/athena/Athena2Constants.java | 5 + .../component/aws2/athena/Athena2Producer.java | 218 ++++++++++++--------- .../dsl/Athena2EndpointBuilderFactory.java | 13 ++ 5 files changed, 175 insertions(+), 127 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-athena.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-athena.json index ae22a1de72be..a700058ce79c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-athena.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-athena.json @@ -71,22 +71,23 @@ "CamelAwsAthenaWorkGroup": { "index": 3, "kind": "header", "displayName": "", "group": "listQueryExecutions startQueryExecution", "label": "listQueryExecutions startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The workgroup to use for running the query.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#WORK_GROUP" }, "CamelAwsAthenaNextToken": { "index": 4, "kind": "header", "displayName": "", "group": "getQueryResults listQueryExecutions", "label": "getQueryResults listQueryExecutions", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Pagination token to use in the case where the response from the previous request was truncated.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#NEXT_ [...] "CamelAwsAthenaMaxResults": { "index": 5, "kind": "header", "displayName": "", "group": "getQueryResults listQueryExecutions", "label": "getQueryResults listQueryExecutions", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Max number of results to return for the given operation (if supported by the Athena API endpoint). If not set, will use the Athena API default for the given operation.", "con [...] - "CamelAwsAthenaIncludeTrace": { "index": 6, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Include useful trace information at the beginning of queries as an SQL comment (prefixed with --).", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#INCLUDE_TRACE" }, - "CamelAwsAthenaOutputLocation": { "index": 7, "kind": "header", "displayName": "", "group": "getQueryExecution getQueryResults startQueryExecution", "label": "getQueryExecution getQueryResults startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The location in Amazon S3 where query results are stored, such as s3:\/\/path\/to\/query\/bucket\/. Ensure this value ends with a forwa [...] - "CamelAwsAthenaOutputType": { "index": 8, "kind": "header", "displayName": "", "group": "getQueryResults", "label": "getQueryResults", "required": false, "javaType": "org.apache.camel.component.aws2.athena.Athena2OutputType", "enum": [ "StreamList", "SelectList", "S3Pointer" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "How query results should be returned. One of StreamList (default - return a GetQueryResultsIterable that can pag [...] - "CamelAwsAthenaQueryExecutionState": { "index": 9, "kind": "header", "displayName": "", "group": "getQueryExecution getQueryResults startQueryExecution", "label": "getQueryExecution getQueryResults startQueryExecution", "required": false, "javaType": "software.amazon.awssdk.services.athena.model.QueryExecutionState", "enum": [ "QUEUED", "RUNNING", "SUCCEEDED", "FAILED", "CANCELLED", "null" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "descriptio [...] - "CamelAwsAthenaClientRequestToken": { "index": 10, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "A unique string to ensure issues queries are idempotent. It is unlikely you will need to set this.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#CLIENT_REQUEST_TOKEN" }, - "CamelAwsAthenaQueryString": { "index": 11, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The SQL query to run. Except for simple queries, prefer setting this as the body of the Exchange or as this header to avoid having to deal with URL encoding issues.", "constantName": "org.apache.camel.comp [...] - "CamelAwsAthenaEncryptionOption": { "index": 12, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "software.amazon.awssdk.services.athena.model.EncryptionOption", "enum": [ "SSE_S3", "SSE_KMS", "CSE_KMS", "null" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The encryption type to use when storing query results in S3.", "constantName": "org.apache.ca [...] - "CamelAwsAthenaKmsKey": { "index": 13, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "For SSE-KMS and CSE-KMS, this is the KMS key ARN or ID.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#KMS_KEY" }, - "CamelAwsAthenaWaitTimeout": { "index": 14, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Optional max wait time in millis to wait for a successful query completion. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.compo [...] - "CamelAwsAthenaInitialDelay": { "index": 15, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Milliseconds before the first poll for query execution status. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component.aws2.at [...] - "CamelAwsAthenaDelay": { "index": 16, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Milliseconds before the next poll for query execution status. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component.aws2.athena.Ath [...] - "CamelAwsAthenaMaxAttempts": { "index": 17, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Maximum number of times to attempt a query. Set to 1 to disable retries. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component [...] - "CamelAwsAthenaRetry": { "index": 18, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Optional comma separated list of error types to retry the query for. Use 'retryable' to retry all retryable failure conditions (e.g. generic errors and resources exhausted), 'generic' to retry 'GENERIC_INTERNAL_ [...] - "CamelAwsAthenaResetWaitTimeoutOnRetry": { "index": 19, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Reset the waitTimeout countdown in the event of a query retry. If set to true, potential max time spent waiting for queries is equal to waitTimeout x maxAttempts. See the section 'Waiting for [...] - "CamelAwsAthenaStartQueryExecutionAttempts": { "index": 20, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total number of attempts made to run the query. Will be greater than 1 if the query is retried.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#START_QUERY_EXECUTION [...] - "CamelAwsAthenaStartQueryExecutionElapsedMillis": { "index": 21, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total time in millis taken in startQueryExecution (mostly relevant when waiting for query completion within startQueryExecution).", "constantName": "org.apache.camel.component.aws2.athen [...] + "CamelAwsAthenaIsTruncated": { "index": 6, "kind": "header", "displayName": "", "group": "getQueryResults listQueryExecutions", "label": "getQueryResults listQueryExecutions", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the response has more results (i.e., is truncated). If true, use the NEXT_TOKEN header to fetch the next page.", "constantName": "org.apache.camel.component.aws2.ath [...] + "CamelAwsAthenaIncludeTrace": { "index": 7, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Include useful trace information at the beginning of queries as an SQL comment (prefixed with --).", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#INCLUDE_TRACE" }, + "CamelAwsAthenaOutputLocation": { "index": 8, "kind": "header", "displayName": "", "group": "getQueryExecution getQueryResults startQueryExecution", "label": "getQueryExecution getQueryResults startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The location in Amazon S3 where query results are stored, such as s3:\/\/path\/to\/query\/bucket\/. Ensure this value ends with a forwa [...] + "CamelAwsAthenaOutputType": { "index": 9, "kind": "header", "displayName": "", "group": "getQueryResults", "label": "getQueryResults", "required": false, "javaType": "org.apache.camel.component.aws2.athena.Athena2OutputType", "enum": [ "StreamList", "SelectList", "S3Pointer" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "How query results should be returned. One of StreamList (default - return a GetQueryResultsIterable that can pag [...] + "CamelAwsAthenaQueryExecutionState": { "index": 10, "kind": "header", "displayName": "", "group": "getQueryExecution getQueryResults startQueryExecution", "label": "getQueryExecution getQueryResults startQueryExecution", "required": false, "javaType": "software.amazon.awssdk.services.athena.model.QueryExecutionState", "enum": [ "QUEUED", "RUNNING", "SUCCEEDED", "FAILED", "CANCELLED", "null" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "descripti [...] + "CamelAwsAthenaClientRequestToken": { "index": 11, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "A unique string to ensure issues queries are idempotent. It is unlikely you will need to set this.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#CLIENT_REQUEST_TOKEN" }, + "CamelAwsAthenaQueryString": { "index": 12, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The SQL query to run. Except for simple queries, prefer setting this as the body of the Exchange or as this header to avoid having to deal with URL encoding issues.", "constantName": "org.apache.camel.comp [...] + "CamelAwsAthenaEncryptionOption": { "index": 13, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "software.amazon.awssdk.services.athena.model.EncryptionOption", "enum": [ "SSE_S3", "SSE_KMS", "CSE_KMS", "null" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The encryption type to use when storing query results in S3.", "constantName": "org.apache.ca [...] + "CamelAwsAthenaKmsKey": { "index": 14, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "For SSE-KMS and CSE-KMS, this is the KMS key ARN or ID.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#KMS_KEY" }, + "CamelAwsAthenaWaitTimeout": { "index": 15, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Optional max wait time in millis to wait for a successful query completion. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.compo [...] + "CamelAwsAthenaInitialDelay": { "index": 16, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Milliseconds before the first poll for query execution status. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component.aws2.at [...] + "CamelAwsAthenaDelay": { "index": 17, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Milliseconds before the next poll for query execution status. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component.aws2.athena.Ath [...] + "CamelAwsAthenaMaxAttempts": { "index": 18, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Maximum number of times to attempt a query. Set to 1 to disable retries. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component [...] + "CamelAwsAthenaRetry": { "index": 19, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Optional comma separated list of error types to retry the query for. Use 'retryable' to retry all retryable failure conditions (e.g. generic errors and resources exhausted), 'generic' to retry 'GENERIC_INTERNAL_ [...] + "CamelAwsAthenaResetWaitTimeoutOnRetry": { "index": 20, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Reset the waitTimeout countdown in the event of a query retry. If set to true, potential max time spent waiting for queries is equal to waitTimeout x maxAttempts. See the section 'Waiting for [...] + "CamelAwsAthenaStartQueryExecutionAttempts": { "index": 21, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total number of attempts made to run the query. Will be greater than 1 if the query is retried.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#START_QUERY_EXECUTION [...] + "CamelAwsAthenaStartQueryExecutionElapsedMillis": { "index": 22, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total time in millis taken in startQueryExecution (mostly relevant when waiting for query completion within startQueryExecution).", "constantName": "org.apache.camel.component.aws2.athen [...] }, "properties": { "label": { "index": 0, "kind": "path", "displayName": "Label", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.athena.Athena2Configuration", "configurationField": "configuration", "description": "Logical name" }, diff --git a/components/camel-aws/camel-aws2-athena/src/generated/resources/META-INF/org/apache/camel/component/aws2/athena/aws2-athena.json b/components/camel-aws/camel-aws2-athena/src/generated/resources/META-INF/org/apache/camel/component/aws2/athena/aws2-athena.json index ae22a1de72be..a700058ce79c 100644 --- a/components/camel-aws/camel-aws2-athena/src/generated/resources/META-INF/org/apache/camel/component/aws2/athena/aws2-athena.json +++ b/components/camel-aws/camel-aws2-athena/src/generated/resources/META-INF/org/apache/camel/component/aws2/athena/aws2-athena.json @@ -71,22 +71,23 @@ "CamelAwsAthenaWorkGroup": { "index": 3, "kind": "header", "displayName": "", "group": "listQueryExecutions startQueryExecution", "label": "listQueryExecutions startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The workgroup to use for running the query.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#WORK_GROUP" }, "CamelAwsAthenaNextToken": { "index": 4, "kind": "header", "displayName": "", "group": "getQueryResults listQueryExecutions", "label": "getQueryResults listQueryExecutions", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Pagination token to use in the case where the response from the previous request was truncated.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#NEXT_ [...] "CamelAwsAthenaMaxResults": { "index": 5, "kind": "header", "displayName": "", "group": "getQueryResults listQueryExecutions", "label": "getQueryResults listQueryExecutions", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Max number of results to return for the given operation (if supported by the Athena API endpoint). If not set, will use the Athena API default for the given operation.", "con [...] - "CamelAwsAthenaIncludeTrace": { "index": 6, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Include useful trace information at the beginning of queries as an SQL comment (prefixed with --).", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#INCLUDE_TRACE" }, - "CamelAwsAthenaOutputLocation": { "index": 7, "kind": "header", "displayName": "", "group": "getQueryExecution getQueryResults startQueryExecution", "label": "getQueryExecution getQueryResults startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The location in Amazon S3 where query results are stored, such as s3:\/\/path\/to\/query\/bucket\/. Ensure this value ends with a forwa [...] - "CamelAwsAthenaOutputType": { "index": 8, "kind": "header", "displayName": "", "group": "getQueryResults", "label": "getQueryResults", "required": false, "javaType": "org.apache.camel.component.aws2.athena.Athena2OutputType", "enum": [ "StreamList", "SelectList", "S3Pointer" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "How query results should be returned. One of StreamList (default - return a GetQueryResultsIterable that can pag [...] - "CamelAwsAthenaQueryExecutionState": { "index": 9, "kind": "header", "displayName": "", "group": "getQueryExecution getQueryResults startQueryExecution", "label": "getQueryExecution getQueryResults startQueryExecution", "required": false, "javaType": "software.amazon.awssdk.services.athena.model.QueryExecutionState", "enum": [ "QUEUED", "RUNNING", "SUCCEEDED", "FAILED", "CANCELLED", "null" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "descriptio [...] - "CamelAwsAthenaClientRequestToken": { "index": 10, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "A unique string to ensure issues queries are idempotent. It is unlikely you will need to set this.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#CLIENT_REQUEST_TOKEN" }, - "CamelAwsAthenaQueryString": { "index": 11, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The SQL query to run. Except for simple queries, prefer setting this as the body of the Exchange or as this header to avoid having to deal with URL encoding issues.", "constantName": "org.apache.camel.comp [...] - "CamelAwsAthenaEncryptionOption": { "index": 12, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "software.amazon.awssdk.services.athena.model.EncryptionOption", "enum": [ "SSE_S3", "SSE_KMS", "CSE_KMS", "null" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The encryption type to use when storing query results in S3.", "constantName": "org.apache.ca [...] - "CamelAwsAthenaKmsKey": { "index": 13, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "For SSE-KMS and CSE-KMS, this is the KMS key ARN or ID.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#KMS_KEY" }, - "CamelAwsAthenaWaitTimeout": { "index": 14, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Optional max wait time in millis to wait for a successful query completion. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.compo [...] - "CamelAwsAthenaInitialDelay": { "index": 15, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Milliseconds before the first poll for query execution status. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component.aws2.at [...] - "CamelAwsAthenaDelay": { "index": 16, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Milliseconds before the next poll for query execution status. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component.aws2.athena.Ath [...] - "CamelAwsAthenaMaxAttempts": { "index": 17, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Maximum number of times to attempt a query. Set to 1 to disable retries. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component [...] - "CamelAwsAthenaRetry": { "index": 18, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Optional comma separated list of error types to retry the query for. Use 'retryable' to retry all retryable failure conditions (e.g. generic errors and resources exhausted), 'generic' to retry 'GENERIC_INTERNAL_ [...] - "CamelAwsAthenaResetWaitTimeoutOnRetry": { "index": 19, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Reset the waitTimeout countdown in the event of a query retry. If set to true, potential max time spent waiting for queries is equal to waitTimeout x maxAttempts. See the section 'Waiting for [...] - "CamelAwsAthenaStartQueryExecutionAttempts": { "index": 20, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total number of attempts made to run the query. Will be greater than 1 if the query is retried.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#START_QUERY_EXECUTION [...] - "CamelAwsAthenaStartQueryExecutionElapsedMillis": { "index": 21, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total time in millis taken in startQueryExecution (mostly relevant when waiting for query completion within startQueryExecution).", "constantName": "org.apache.camel.component.aws2.athen [...] + "CamelAwsAthenaIsTruncated": { "index": 6, "kind": "header", "displayName": "", "group": "getQueryResults listQueryExecutions", "label": "getQueryResults listQueryExecutions", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the response has more results (i.e., is truncated). If true, use the NEXT_TOKEN header to fetch the next page.", "constantName": "org.apache.camel.component.aws2.ath [...] + "CamelAwsAthenaIncludeTrace": { "index": 7, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Include useful trace information at the beginning of queries as an SQL comment (prefixed with --).", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#INCLUDE_TRACE" }, + "CamelAwsAthenaOutputLocation": { "index": 8, "kind": "header", "displayName": "", "group": "getQueryExecution getQueryResults startQueryExecution", "label": "getQueryExecution getQueryResults startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The location in Amazon S3 where query results are stored, such as s3:\/\/path\/to\/query\/bucket\/. Ensure this value ends with a forwa [...] + "CamelAwsAthenaOutputType": { "index": 9, "kind": "header", "displayName": "", "group": "getQueryResults", "label": "getQueryResults", "required": false, "javaType": "org.apache.camel.component.aws2.athena.Athena2OutputType", "enum": [ "StreamList", "SelectList", "S3Pointer" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "How query results should be returned. One of StreamList (default - return a GetQueryResultsIterable that can pag [...] + "CamelAwsAthenaQueryExecutionState": { "index": 10, "kind": "header", "displayName": "", "group": "getQueryExecution getQueryResults startQueryExecution", "label": "getQueryExecution getQueryResults startQueryExecution", "required": false, "javaType": "software.amazon.awssdk.services.athena.model.QueryExecutionState", "enum": [ "QUEUED", "RUNNING", "SUCCEEDED", "FAILED", "CANCELLED", "null" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "descripti [...] + "CamelAwsAthenaClientRequestToken": { "index": 11, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "A unique string to ensure issues queries are idempotent. It is unlikely you will need to set this.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#CLIENT_REQUEST_TOKEN" }, + "CamelAwsAthenaQueryString": { "index": 12, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The SQL query to run. Except for simple queries, prefer setting this as the body of the Exchange or as this header to avoid having to deal with URL encoding issues.", "constantName": "org.apache.camel.comp [...] + "CamelAwsAthenaEncryptionOption": { "index": 13, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "software.amazon.awssdk.services.athena.model.EncryptionOption", "enum": [ "SSE_S3", "SSE_KMS", "CSE_KMS", "null" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The encryption type to use when storing query results in S3.", "constantName": "org.apache.ca [...] + "CamelAwsAthenaKmsKey": { "index": 14, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "For SSE-KMS and CSE-KMS, this is the KMS key ARN or ID.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#KMS_KEY" }, + "CamelAwsAthenaWaitTimeout": { "index": 15, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Optional max wait time in millis to wait for a successful query completion. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.compo [...] + "CamelAwsAthenaInitialDelay": { "index": 16, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Milliseconds before the first poll for query execution status. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component.aws2.at [...] + "CamelAwsAthenaDelay": { "index": 17, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Milliseconds before the next poll for query execution status. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component.aws2.athena.Ath [...] + "CamelAwsAthenaMaxAttempts": { "index": 18, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Maximum number of times to attempt a query. Set to 1 to disable retries. See the section 'Waiting for Query Completion and Retrying Failed Queries' to learn more.", "constantName": "org.apache.camel.component [...] + "CamelAwsAthenaRetry": { "index": 19, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Optional comma separated list of error types to retry the query for. Use 'retryable' to retry all retryable failure conditions (e.g. generic errors and resources exhausted), 'generic' to retry 'GENERIC_INTERNAL_ [...] + "CamelAwsAthenaResetWaitTimeoutOnRetry": { "index": 20, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Reset the waitTimeout countdown in the event of a query retry. If set to true, potential max time spent waiting for queries is equal to waitTimeout x maxAttempts. See the section 'Waiting for [...] + "CamelAwsAthenaStartQueryExecutionAttempts": { "index": 21, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total number of attempts made to run the query. Will be greater than 1 if the query is retried.", "constantName": "org.apache.camel.component.aws2.athena.Athena2Constants#START_QUERY_EXECUTION [...] + "CamelAwsAthenaStartQueryExecutionElapsedMillis": { "index": 22, "kind": "header", "displayName": "", "group": "startQueryExecution", "label": "startQueryExecution", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total time in millis taken in startQueryExecution (mostly relevant when waiting for query completion within startQueryExecution).", "constantName": "org.apache.camel.component.aws2.athen [...] }, "properties": { "label": { "index": 0, "kind": "path", "displayName": "Label", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.athena.Athena2Configuration", "configurationField": "configuration", "description": "Logical name" }, diff --git a/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2Constants.java b/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2Constants.java index e051097dece1..e23bccfefe0b 100644 --- a/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2Constants.java +++ b/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2Constants.java @@ -46,6 +46,11 @@ public interface Athena2Constants { "If not set, will use the Athena API default for the given operation.", javaType = "Integer") String MAX_RESULTS = "CamelAwsAthenaMaxResults"; + @Metadata(label = "getQueryResults listQueryExecutions", + description = "Whether the response has more results (i.e., is truncated). " + + "If true, use the NEXT_TOKEN header to fetch the next page.", + javaType = "Boolean") + String IS_TRUNCATED = "CamelAwsAthenaIsTruncated"; @Metadata(label = "startQueryExecution", description = "Include useful trace information at the beginning of queries as an SQL comment (prefixed with \"--\").", javaType = "boolean") diff --git a/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2Producer.java b/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2Producer.java index 0ac20236220b..c87d2a602ba1 100644 --- a/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2Producer.java +++ b/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2Producer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.aws2.athena; import java.util.Arrays; +import java.util.function.Supplier; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -132,6 +133,7 @@ public class Athena2Producer extends DefaultProducer { GetQueryResultsRequest request = doGetQueryResultsRequest(queryExecutionId, exchange).build(); GetQueryResultsResponse response = athenaClient.getQueryResults(request); message.setHeader(Athena2Constants.NEXT_TOKEN, response.nextToken()); + message.setHeader(Athena2Constants.IS_TRUNCATED, response.nextToken() != null); message.setBody(response); } else if (outputType == Athena2OutputType.S3Pointer) { GetQueryExecutionResponse response = doGetQueryExecution(queryExecutionId, athenaClient); @@ -148,14 +150,12 @@ public class Athena2Producer extends DefaultProducer { } private Athena2OutputType determineOutputType(Exchange exchange) { - Athena2OutputType outputType = exchange.getIn().getHeader(Athena2Constants.OUTPUT_TYPE, Athena2OutputType.class); - - if (ObjectHelper.isEmpty(outputType)) { - outputType = getConfiguration().getOutputType(); - LOG.trace("AWS Athena output type is missing, using default one [{}]", outputType); - } - - return outputType; + return getOptionalHeader( + exchange, + Athena2Constants.OUTPUT_TYPE, + Athena2OutputType.class, + () -> getConfiguration().getOutputType(), + "output type"); } private GetQueryResultsRequest.Builder doGetQueryResultsRequest(String queryExecutionId, Exchange exchange) { @@ -190,6 +190,7 @@ public class Athena2Producer extends DefaultProducer { ListQueryExecutionsResponse response = athenaClient.listQueryExecutions(request.build()); Message message = getMessageForResponse(exchange); message.setHeader(Athena2Constants.NEXT_TOKEN, response.nextToken()); + message.setHeader(Athena2Constants.IS_TRUNCATED, response.nextToken() != null); message.setBody(response); } @@ -281,72 +282,58 @@ public class Athena2Producer extends DefaultProducer { } private String determineQueryExecutionId(final Exchange exchange) { - String queryExecutionId = exchange.getIn().getHeader(Athena2Constants.QUERY_EXECUTION_ID, String.class); - - if (ObjectHelper.isEmpty(queryExecutionId)) { - queryExecutionId = getConfiguration().getQueryExecutionId(); - } - - if (ObjectHelper.isEmpty(queryExecutionId)) { - throw new IllegalArgumentException("AWS Athena query execution id is required."); - } - - return queryExecutionId; + return getRequiredHeader( + exchange, + Athena2Constants.QUERY_EXECUTION_ID, + String.class, + () -> getConfiguration().getQueryExecutionId(), + "AWS Athena query execution id is required."); } private Integer determineMaxResults(final Exchange exchange) { - Integer maxResults = exchange.getIn().getHeader(Athena2Constants.MAX_RESULTS, Integer.class); - - if (ObjectHelper.isEmpty(maxResults)) { - maxResults = getConfiguration().getMaxResults(); - LOG.trace("AWS Athena max results is missing, using default one [{}]", maxResults); - } - - return maxResults; + return getOptionalHeader( + exchange, + Athena2Constants.MAX_RESULTS, + Integer.class, + () -> getConfiguration().getMaxResults(), + "max results"); } private boolean determineIncludeTrace(final Exchange exchange) { - Boolean includeTrace = exchange.getIn().getHeader(Athena2Constants.INCLUDE_TRACE, Boolean.class); - - if (ObjectHelper.isEmpty(includeTrace)) { - includeTrace = getConfiguration().isIncludeTrace(); - LOG.trace("AWS Athena include trace is missing, using default one [{}]", includeTrace); - } - - return includeTrace; + Boolean includeTrace = getOptionalHeader( + exchange, + Athena2Constants.INCLUDE_TRACE, + Boolean.class, + () -> getConfiguration().isIncludeTrace(), + "include trace"); + return includeTrace != null ? includeTrace : false; } private String determineNextToken(final Exchange exchange) { - String nextToken = exchange.getIn().getHeader(Athena2Constants.NEXT_TOKEN, String.class); - - if (ObjectHelper.isEmpty(nextToken)) { - nextToken = getConfiguration().getNextToken(); - LOG.trace("AWS Athena next token is missing, using default one [{}]", nextToken); - } - - return nextToken; + return getOptionalHeader( + exchange, + Athena2Constants.NEXT_TOKEN, + String.class, + () -> getConfiguration().getNextToken(), + "next token"); } private String determineClientRequestToken(final Exchange exchange) { - String clientRequestToken = exchange.getIn().getHeader(Athena2Constants.CLIENT_REQUEST_TOKEN, String.class); - - if (ObjectHelper.isEmpty(clientRequestToken)) { - clientRequestToken = getConfiguration().getClientRequestToken(); - LOG.trace("AWS Athena client request token is missing, using default one [{}]", clientRequestToken); - } - - return clientRequestToken; + return getOptionalHeader( + exchange, + Athena2Constants.CLIENT_REQUEST_TOKEN, + String.class, + () -> getConfiguration().getClientRequestToken(), + "client request token"); } private String determineDatabase(final Exchange exchange) { - String database = exchange.getIn().getHeader(Athena2Constants.DATABASE, String.class); - - if (ObjectHelper.isEmpty(database)) { - database = getConfiguration().getDatabase(); - LOG.trace("AWS Athena database is missing, using default one [{}]", database); - } - - return database; + return getOptionalHeader( + exchange, + Athena2Constants.DATABASE, + String.class, + () -> getConfiguration().getDatabase(), + "database"); } private String determineQueryString(final Exchange exchange) { @@ -377,51 +364,39 @@ public class Athena2Producer extends DefaultProducer { } private EncryptionOption determineEncryptionOption(final Exchange exchange) { - EncryptionOption encryptionOption - = exchange.getIn().getHeader(Athena2Constants.ENCRYPTION_OPTION, EncryptionOption.class); - - if (ObjectHelper.isEmpty(encryptionOption)) { - encryptionOption = getConfiguration().getEncryptionOption(); - LOG.trace("AWS Athena encryption option is missing, using default one [{}]", encryptionOption); - } - - return encryptionOption; + return getOptionalHeader( + exchange, + Athena2Constants.ENCRYPTION_OPTION, + EncryptionOption.class, + () -> getConfiguration().getEncryptionOption(), + "encryption option"); } private String determineKmsKey(final Exchange exchange) { - String kmsKey = exchange.getIn().getHeader(Athena2Constants.KMS_KEY, String.class); - - if (ObjectHelper.isEmpty(kmsKey)) { - kmsKey = getConfiguration().getKmsKey(); - LOG.trace("AWS Athena kms key is missing, using default one [{}]", kmsKey); - } - - return kmsKey; + return getOptionalHeader( + exchange, + Athena2Constants.KMS_KEY, + String.class, + () -> getConfiguration().getKmsKey(), + "kms key"); } private String determineOutputLocation(final Exchange exchange) { - String outputLocation = exchange.getIn().getHeader(Athena2Constants.OUTPUT_LOCATION, String.class); - - if (ObjectHelper.isEmpty(outputLocation)) { - outputLocation = getConfiguration().getOutputLocation(); - } - - if (ObjectHelper.isEmpty(outputLocation)) { - throw new IllegalArgumentException("AWS Athena output location is required."); - } - - return outputLocation; + return getRequiredHeader( + exchange, + Athena2Constants.OUTPUT_LOCATION, + String.class, + () -> getConfiguration().getOutputLocation(), + "AWS Athena output location is required."); } private String determineWorkGroup(final Exchange exchange) { - String workGroup = exchange.getIn().getHeader(Athena2Constants.WORK_GROUP, String.class); - - if (ObjectHelper.isEmpty(workGroup)) { - workGroup = getConfiguration().getWorkGroup(); - LOG.trace("AWS Athena work group is missing, using default one [{}]", workGroup); - } - - return workGroup; + return getOptionalHeader( + exchange, + Athena2Constants.WORK_GROUP, + String.class, + () -> getConfiguration().getWorkGroup(), + "work group"); } protected Athena2Configuration getConfiguration() { @@ -433,6 +408,59 @@ public class Athena2Producer extends DefaultProducer { return (Athena2Endpoint) super.getEndpoint(); } + /** + * Gets an optional value from the exchange header, falling back to configuration if not present. + * + * @param exchange the Camel exchange + * @param headerName the header name to check + * @param headerType the expected type + * @param configurationValue supplier for the configuration fallback value + * @param parameterName name of the parameter for logging + * @param <T> the value type + * @return the value from header or configuration, may be null + */ + private <T> T getOptionalHeader( + Exchange exchange, + String headerName, + Class<T> headerType, + Supplier<T> configurationValue, + String parameterName) { + T value = exchange.getIn().getHeader(headerName, headerType); + if (ObjectHelper.isEmpty(value)) { + value = configurationValue.get(); + LOG.trace("AWS Athena {} is missing, using default one [{}]", parameterName, value); + } + return value; + } + + /** + * Gets a required value from the exchange header, falling back to configuration if not present. + * + * @param exchange the Camel exchange + * @param headerName the header name to check + * @param headerType the expected type + * @param configurationValue supplier for the configuration fallback value + * @param errorMessage error message if value is not found + * @param <T> the value type + * @return the value from header or configuration + * @throws IllegalArgumentException if value is not found in header or configuration + */ + private <T> T getRequiredHeader( + Exchange exchange, + String headerName, + Class<T> headerType, + Supplier<T> configurationValue, + String errorMessage) { + T value = exchange.getIn().getHeader(headerName, headerType); + if (ObjectHelper.isEmpty(value)) { + value = configurationValue.get(); + } + if (ObjectHelper.isEmpty(value)) { + throw new IllegalArgumentException(errorMessage); + } + return value; + } + @Override protected void doStart() throws Exception { // health-check is optional so discover and resolve diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Athena2EndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Athena2EndpointBuilderFactory.java index 90d00e3cdb51..afdf7a47dced 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Athena2EndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Athena2EndpointBuilderFactory.java @@ -1094,6 +1094,19 @@ public interface Athena2EndpointBuilderFactory { public String awsAthenaMaxResults() { return "CamelAwsAthenaMaxResults"; } + /** + * Whether the response has more results (i.e., is truncated). If true, + * use the NEXT_TOKEN header to fetch the next page. + * + * The option is a: {@code Boolean} type. + * + * Group: getQueryResults listQueryExecutions + * + * @return the name of the header {@code AwsAthenaIsTruncated}. + */ + public String awsAthenaIsTruncated() { + return "CamelAwsAthenaIsTruncated"; + } /** * Include useful trace information at the beginning of queries as an * SQL comment (prefixed with --).
