[ 
https://issues.apache.org/jira/browse/CAMEL-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17431169#comment-17431169
 ] 

Claus Ibsen commented on CAMEL-17110:
-------------------------------------

After digging a bit, it looks like its the aws-s3 consumer is having a race 
condition, where a previous poll picked up a file, that is sent to kafka, that 
is processed asynchronously so the current thread becomes idle, and then on 
next poll, the consumer thread "sees" the same file as on previous poll, that 
the kafka producer is currently sending. And so when the kafka producer is done 
sending, it has an on completion to delete the s3 file, which it does, at the 
same time as the other thread is trying to process the file (duplicate), which 
is then now deleted.

What the s3 consumer lacks is an in-progress repository to keep track of 
currently in-flight messages with files. The camel-file component have such 
feature.

We can add similar inflight repository as we have in camel-file to s3 - its a 
bit of work as you need to ensure to keep the in flight repo correctly up to 
date when there are exceptions / batch break before all done etc. But basically 
copy/paste from camel-file where you can.

> Camel-Kamelets: While using AWS S3 source noticed files were deleted before 
> being consumed at all
> -------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-17110
>                 URL: https://issues.apache.org/jira/browse/CAMEL-17110
>             Project: Camel
>          Issue Type: Bug
>    Affects Versions: 3.11.3
>            Reporter: Andrea Cosentino
>            Priority: Major
>
> {code}
> - route:
>     from:
>       uri: "kamelet:aws-s3-source"
>       parameters:
>         bucketNameOrArn: "camel-kafka-connector"
>         accessKey: "access"
>         secretKey: "secret"
>         region: "eu-west-1"
>     steps:
>       - to: 
>           uri: "kamelet:kafka-not-secured-sink"
>           parameters:
>             brokers: "localhost:9092"
>             topic: "s3-source-topic"
> {code}
> In the log, if you enabled trace you may notice something like:
> {code:java}
> 10:14:24.476 [Camel (AWS-S3-To-Kafka) thread #11 - 
> KafkaProducer[s3-source-topic]] TRACE o.a.c.c.aws2.s3.AWS2S3Consumer - 
> Deleted object from bucket camel-kafka-connector with key 
> jkXzIEbaYyKMTMwGpNHL.txt...
> 10:14:24.491 [Camel (AWS-S3-To-Kafka) thread #0 - 
> aws2-s3://camel-kafka-connector] TRACE s.a.awssdk.auth.signer.Aws4Signer - 
> AWS4 Canonical Request: GET
> /jkXzIEbaYyKMTMwGpNHL.txt
> 10:14:24.582 [Camel (AWS-S3-To-Kafka) thread #0 - 
> aws2-s3://camel-kafka-connector] DEBUG software.amazon.awssdk.request - 
> Received error response: 
> software.amazon.awssdk.services.s3.model.NoSuchKeyException: The specified 
> key does not exist. 
> {code}
> While, the get should happens before the deletions. This is happening only 
> when using Kamelets. It looks like the exchange completed before the get 
> operation has been done.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to