[ 
https://issues.apache.org/jira/browse/SPARK-46361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-46361:
-------------------------------
    Description: 
*Proposed API:*
{code:java}
def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
    """
    Persist the spark dataframe as chunks, each chunk is an arrow batch.
    Return the list of chunk ids.
    This function is only available when it is called from spark driver process.
    """

def read_chunk(chunk_id):
    """
    Read chunk by id, return arrow batch data of this chunk.
    You can call this function from spark driver, spark python UDF python,
    descendant process of spark driver, or descendant process of spark python 
UDF worker.
    """

def unpersist_chunks(chunk_ids: list[str]) -> None:
    """
    Remove chunks by chunk ids.
    This function is only available when it is called from spark driver process.
    """{code}
*Motivation:*

(1)
In Ray on spark, we want to support loading Ray data from arbitrary spark 
Dataframe with in-memory conversion,

for Ray on spark, Ray datasource read-task runs as child process of Ray worker 
node, and in Ray on spark, we launch Ray worker node as child process of 
pyspark UDF worker.

So that the above proposed API allows descendent python process of pyspark UDF 
worker to read a chunk data of given spark dataframe, based on this, we can 
achieve efficient "spark DataFrame" to "Ray dataset" conversion.

(2)
For petastorm spark dataset converter (see 
https://www.databricks.com/blog/2020/06/16/simplify-data-conversion-from-apache-spark-to-tensorflow-and-pytorch.html)
 , using the added API, we can achieve better performance

  was:
*Proposed API:*
{code:java}
def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
    """
    Persist the spark dataframe as chunks, each chunk is an arrow batch.
    Return the list of chunk ids.
    This function is only available when it is called from spark driver process.
    """

def read_chunk(chunk_id):
    """
    Read chunk by id, return arrow batch data of this chunk.
    You can call this function from spark driver, spark python UDF python,
    descendant process of spark driver, or descendant process of spark python 
UDF worker.
    """

def unpersist_chunks(chunk_ids: list[str]) -> None:
    """
    Remove chunks by chunk ids.
    This function is only available when it is called from spark driver process.
    """{code}
*Motivation:*

In Ray on spark, we want to support loading Ray data from arbitrary spark 
Dataframe with in-memory conversion,

for Ray on spark, Ray datasource read-task runs as child process of Ray worker 
node, and in Ray on spark, we launch Ray worker node as child process of 
pyspark UDF worker.

So that the above proposed API allows descendent python process of pyspark UDF 
worker to read a chunk data of given spark dataframe.


> Add spark dataset chunk read API (python only)
> ----------------------------------------------
>
>                 Key: SPARK-46361
>                 URL: https://issues.apache.org/jira/browse/SPARK-46361
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark, Spark Core
>    Affects Versions: 4.0.0
>            Reporter: Weichen Xu
>            Priority: Major
>              Labels: pull-request-available
>
> *Proposed API:*
> {code:java}
> def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
>     """
>     Persist the spark dataframe as chunks, each chunk is an arrow batch.
>     Return the list of chunk ids.
>     This function is only available when it is called from spark driver 
> process.
>     """
> def read_chunk(chunk_id):
>     """
>     Read chunk by id, return arrow batch data of this chunk.
>     You can call this function from spark driver, spark python UDF python,
>     descendant process of spark driver, or descendant process of spark python 
> UDF worker.
>     """
> def unpersist_chunks(chunk_ids: list[str]) -> None:
>     """
>     Remove chunks by chunk ids.
>     This function is only available when it is called from spark driver 
> process.
>     """{code}
> *Motivation:*
> (1)
> In Ray on spark, we want to support loading Ray data from arbitrary spark 
> Dataframe with in-memory conversion,
> for Ray on spark, Ray datasource read-task runs as child process of Ray 
> worker node, and in Ray on spark, we launch Ray worker node as child process 
> of pyspark UDF worker.
> So that the above proposed API allows descendent python process of pyspark 
> UDF worker to read a chunk data of given spark dataframe, based on this, we 
> can achieve efficient "spark DataFrame" to "Ray dataset" conversion.
> (2)
> For petastorm spark dataset converter (see 
> https://www.databricks.com/blog/2020/06/16/simplify-data-conversion-from-apache-spark-to-tensorflow-and-pytorch.html)
>  , using the added API, we can achieve better performance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to