Sharing an example since a few people asked me off-list:

We have stored the partition details in the read/write nodes of the
physical plan.
So this can be accessed via the plan like plan.getInputPartitions or
plan.getOutputPartitions, which internally loops through the nodes in the
plan and collects the input and output partition details.

Could easily be extended to a dataframe method like df.getInputPartitions
or df.getOutputPartitions.

An example:
df = spark.sql("insert into table_b SELECT * from table_a where
datestr>"dd/mm/yyyy")
df.show()
inputPartitions = df.getInputPartitions
outputPartitions = df.getOutputPartitions

inputPartitions and outputPartitions now have the list of tables and the
partitions in those tables the query read from and wrote to, can be used to
power freshness alerts or used for any other statistics.

Want to know from the dev community, would a SPIP proposal be ideal here?

On Wed, Jan 31, 2024 at 11:45 AM Aditya Sohoni <aditya.soh...@uber.com>
wrote:

> Hello Spark Devs!
>
>
> We are from Uber's Spark team.
>
>
> Our ETL jobs use Spark to read and write from Hive datasets stored in
> HDFS. The freshness of the partition written to depends on the freshness of
> the data in the input partition(s). We monitor this freshness score, so
> that partitions in our critical tables always have fresh data.
>
>
> We are looking for some code/helper function/utility etc built into the
> Spark engine, through which we can programmatically get the list of
> partitions read and written by an execution.
>
>
> We looked for this in the plan, and our initial code study did not
> pinpoint us to any such method. We have been dependent on indirect ways
> like audit logs of storage, HMS, etc. We find them difficult to use and
> scale.
>
>
> However, the spark code does contain the list of partitions read and
> written. The below files have the partition data for the given file format:
>
> 1. Input partitions from HiveTableScanExec.scala(Text format)
>
> 2. Input partitions from DataSourceScanExec.scala(Parquet/Hudi/Orc).
>
> 3. Output partitions from InsertIntoHiveTable.scala(Text format)
>
> 4. Output partitions from
> InsertIntoHadoopFsRelationCommand.scala(Parquet/Hudi/Orc).
>
>
> We did come up with some code that can help gather this info in a
> programmatically friendly way. We maintained this information in the plan.
> We wrapped the plan with some convenience classes and methods to extract
> the partition details.
>
>
> We felt that such a programmatic interface could be used for more purposes
> as well, like showing in SHS a new set of statistics that can aid in
> troubleshooting.
>
>
> I wanted to know from the Dev Community, is there already something that
> is/was implemented in Spark that can solve our requirement? If not, we
> would love to share how we have implemented this and contribute to the
> community.
>
>
> Regards,
>
> Aditya Sohoni
>

Reply via email to