[ 
https://issues.apache.org/jira/browse/SPARK-31962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher Highman updated SPARK-31962:
----------------------------------------
    Description: 
Two new options, _modifiiedBefore_ and _modifiedAfter_, is provided expecting a 
value in 'YYYY-MM-DDTHH:mm:ss' format. _PartioningAwareFileIndex_ considers 
these options during the process of checking for files, just before considering 
applied _PathFilters_ such as {{pathGlobFilter.}} In order to filter file 
results, a new PathFilter class was derived for this purpose. General 
house-keeping around classes extending PathFilter was performed for neatness. 
It became apparent support was needed to handle multiple potential path 
filters. Logic was introduced for this purpose and the associated tests written.

 

When loading files from a data source, there can often times be thousands of 
file within a respective file path. In many cases I've seen, we want to start 
loading from a folder path and ideally be able to begin loading files having 
modification dates past a certain point. This would mean out of thousands of 
potential files, only the ones with modification dates greater than the 
specified timestamp would be considered. This saves a ton of time automatically 
and reduces significant complexity managing this in code.

 

*Example Usages*
_Load all CSV files modified after date:_
{{spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()}}

_Load all CSV files modified before date:_
{{spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()}}

_Load all CSV files modified between two dates:_
{{spark.read .format("csv") .option("modifiedAfter","2019-01-15T05:00:00") 
.option("modifiedBefore","2020-06-15T05:00:00") .load()}}

 

  was:
When using structured streaming or just loading from a file data source, I've 
encountered a number of occasions where I want to be able to stream from a 
folder containing any number of historical files in CSV format.  When I start 
reading from a folder, however, I might only care about files that were created 
after a certain time.
{code:java}
spark.read
     .option("header", "true")
     .option("delimiter", "\t")
     .format("csv")
     .load("/mnt/Deltas")
{code}
In 
[https://github.com/apache/spark/blob/f3771c6b47d0b3aef10b86586289a1f675c7cfe2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala],
 there is a method, _listLeafFiles,_ which builds _FileStatus_ objects 
containing an implicit _modificationDate_ property.  We may already iterate the 
resulting files if a filter is applied to the path.  In this case, its trivial 
to do a primitive comparison against _modificationDate_ and a date specified 
from an option.  Without the filter specified, we would be expending less 
effort than if the filter were applied by itself since we are comparing 
primitives.  

Having the ability to provide an option where specifying a timestamp when 
loading files from a path would minimize complexity for consumers who leverage 
the ability to load files or do structured streaming from a folder path but do 
not have an interest in reading what could be thousands of files that are not 
relevant.

One example to could be "_fileModifiedDate_" accepting a UTC datetime like 
below.
{code:java}
spark.read
     .option("header", "true")
     .option("delimiter", "\t")
     .option("fileModifiedDate", "2020-05-01T12:00:00")
     .format("csv")
     .load("/mnt/Deltas")
{code}
If this option is specified, the expected behavior would be that files within 
the _"/mnt/Deltas/"_ path must have been modified at or later than the 
specified time in order to be consumed for purposes of reading files from a 
folder path or via structured streaming.

 I have unit tests passing under F_ileIndexSuite_ in the 
_spark.sql.execution.datasources_ package.

 

+*Consumers seeking to replicate or achieve this behavior*+

*Stack Overflow -(spark structured streaming file source read from a certain 
partition onwards)*
[https://stackoverflow.com/questions/58004832/spark-structured-streaming-file-source-read-from-a-certain-partition-onwards]

*Stack Overflow - (Spark Structured Streaming File Source Starting Offset)*
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset/51399134#51399134]


> Provide modifiedAfter and modifiedBefore options when filtering from a 
> batch-based file data source
> ---------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-31962
>                 URL: https://issues.apache.org/jira/browse/SPARK-31962
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Christopher Highman
>            Priority: Minor
>
> Two new options, _modifiiedBefore_ and _modifiedAfter_, is provided expecting 
> a value in 'YYYY-MM-DDTHH:mm:ss' format. _PartioningAwareFileIndex_ considers 
> these options during the process of checking for files, just before 
> considering applied _PathFilters_ such as {{pathGlobFilter.}} In order to 
> filter file results, a new PathFilter class was derived for this purpose. 
> General house-keeping around classes extending PathFilter was performed for 
> neatness. It became apparent support was needed to handle multiple potential 
> path filters. Logic was introduced for this purpose and the associated tests 
> written.
>  
> When loading files from a data source, there can often times be thousands of 
> file within a respective file path. In many cases I've seen, we want to start 
> loading from a folder path and ideally be able to begin loading files having 
> modification dates past a certain point. This would mean out of thousands of 
> potential files, only the ones with modification dates greater than the 
> specified timestamp would be considered. This saves a ton of time 
> automatically and reduces significant complexity managing this in code.
>  
> *Example Usages*
> _Load all CSV files modified after date:_
> {{spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()}}
> _Load all CSV files modified before date:_
> {{spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()}}
> _Load all CSV files modified between two dates:_
> {{spark.read .format("csv") .option("modifiedAfter","2019-01-15T05:00:00") 
> .option("modifiedBefore","2020-06-15T05:00:00") .load()}}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to