[
https://issues.apache.org/jira/browse/SPARK-31962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Christopher Highman updated SPARK-31962:
----------------------------------------
Description:
Two new options, _modifiiedBefore_ and _modifiedAfter_, is provided expecting a
value in 'YYYY-MM-DDTHH:mm:ss' format. _PartioningAwareFileIndex_ considers
these options during the process of checking for files, just before considering
applied _PathFilters_ such as {{pathGlobFilter.}} In order to filter file
results, a new PathFilter class was derived for this purpose. General
house-keeping around classes extending PathFilter was performed for neatness.
It became apparent support was needed to handle multiple potential path
filters. Logic was introduced for this purpose and the associated tests written.
When loading files from a data source, there can often times be thousands of
file within a respective file path. In many cases I've seen, we want to start
loading from a folder path and ideally be able to begin loading files having
modification dates past a certain point. This would mean out of thousands of
potential files, only the ones with modification dates greater than the
specified timestamp would be considered. This saves a ton of time automatically
and reduces significant complexity managing this in code.
*Example Usages*
_Load all CSV files modified after date:_
{{spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()}}
_Load all CSV files modified before date:_
{{spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()}}
_Load all CSV files modified between two dates:_
{{spark.read .format("csv") .option("modifiedAfter","2019-01-15T05:00:00")
.option("modifiedBefore","2020-06-15T05:00:00") .load()}}
was:
When using structured streaming or just loading from a file data source, I've
encountered a number of occasions where I want to be able to stream from a
folder containing any number of historical files in CSV format. When I start
reading from a folder, however, I might only care about files that were created
after a certain time.
{code:java}
spark.read
.option("header", "true")
.option("delimiter", "\t")
.format("csv")
.load("/mnt/Deltas")
{code}
In
[https://github.com/apache/spark/blob/f3771c6b47d0b3aef10b86586289a1f675c7cfe2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala],
there is a method, _listLeafFiles,_ which builds _FileStatus_ objects
containing an implicit _modificationDate_ property. We may already iterate the
resulting files if a filter is applied to the path. In this case, its trivial
to do a primitive comparison against _modificationDate_ and a date specified
from an option. Without the filter specified, we would be expending less
effort than if the filter were applied by itself since we are comparing
primitives.
Having the ability to provide an option where specifying a timestamp when
loading files from a path would minimize complexity for consumers who leverage
the ability to load files or do structured streaming from a folder path but do
not have an interest in reading what could be thousands of files that are not
relevant.
One example to could be "_fileModifiedDate_" accepting a UTC datetime like
below.
{code:java}
spark.read
.option("header", "true")
.option("delimiter", "\t")
.option("fileModifiedDate", "2020-05-01T12:00:00")
.format("csv")
.load("/mnt/Deltas")
{code}
If this option is specified, the expected behavior would be that files within
the _"/mnt/Deltas/"_ path must have been modified at or later than the
specified time in order to be consumed for purposes of reading files from a
folder path or via structured streaming.
I have unit tests passing under F_ileIndexSuite_ in the
_spark.sql.execution.datasources_ package.
+*Consumers seeking to replicate or achieve this behavior*+
*Stack Overflow -(spark structured streaming file source read from a certain
partition onwards)*
[https://stackoverflow.com/questions/58004832/spark-structured-streaming-file-source-read-from-a-certain-partition-onwards]
*Stack Overflow - (Spark Structured Streaming File Source Starting Offset)*
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset/51399134#51399134]
> Provide modifiedAfter and modifiedBefore options when filtering from a
> batch-based file data source
> ---------------------------------------------------------------------------------------------------
>
> Key: SPARK-31962
> URL: https://issues.apache.org/jira/browse/SPARK-31962
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.1.0
> Reporter: Christopher Highman
> Priority: Minor
>
> Two new options, _modifiiedBefore_ and _modifiedAfter_, is provided expecting
> a value in 'YYYY-MM-DDTHH:mm:ss' format. _PartioningAwareFileIndex_ considers
> these options during the process of checking for files, just before
> considering applied _PathFilters_ such as {{pathGlobFilter.}} In order to
> filter file results, a new PathFilter class was derived for this purpose.
> General house-keeping around classes extending PathFilter was performed for
> neatness. It became apparent support was needed to handle multiple potential
> path filters. Logic was introduced for this purpose and the associated tests
> written.
>
> When loading files from a data source, there can often times be thousands of
> file within a respective file path. In many cases I've seen, we want to start
> loading from a folder path and ideally be able to begin loading files having
> modification dates past a certain point. This would mean out of thousands of
> potential files, only the ones with modification dates greater than the
> specified timestamp would be considered. This saves a ton of time
> automatically and reduces significant complexity managing this in code.
>
> *Example Usages*
> _Load all CSV files modified after date:_
> {{spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()}}
> _Load all CSV files modified before date:_
> {{spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()}}
> _Load all CSV files modified between two dates:_
> {{spark.read .format("csv") .option("modifiedAfter","2019-01-15T05:00:00")
> .option("modifiedBefore","2020-06-15T05:00:00") .load()}}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]