[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16896994#comment-16896994
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:56 AM:
---------------------------------------------------------------

* *Issue of  FNFE when querying file :* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.


was (Author: abhishekd0907):
* *Issue of  FNFE when querying file:* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> -------------------------------------------------------
>
>                 Key: BAHIR-213
>                 URL: https://issues.apache.org/jira/browse/BAHIR-213
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Spark Structured Streaming Connectors
>    Affects Versions: Spark-2.4.0
>            Reporter: Abhishek Dixit
>            Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by [gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to