SemyonSinchenko opened a new issue, #957:
URL: https://github.com/apache/datafusion-comet/issues/957

   ### What is the problem the feature request solves?
   
   Spark provide multiple ways to run arrow-backed UDFs. The current 3.5 
supports `mapInArrow`, in the future 4.0 there will be also `applyInArrow`.
   
   My understanding of how it works in Spark under the hood is quite limited, 
so correct me if I'm wrong. At the moment, if Spark see in the plan 
`PythonMapInArrow` it will internaly do a conversion from rows to arrow-batches 
that is should be a columnar representation of the data.
   
   That is a minimal example of running `mapInArrow` in Spark 3.4:
   ```python
   import pandas as pd
   import pyarrow as pa
   from pyspark.sql import SparkSession, types as T
   
   
   if __name__ == "__main__":
       spark = SparkSession.builder.getOrCreate()
       iris = 
pd.read_csv("https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/iris.csv";)
       iris.to_parquet("iris.parquet", index=False)
   
       def arrow_fun(arrow_batch: pa.RecordBatch) -> pa.RecordBatch:
           pdf = arrow_batch.to_pandas()
           pdf["avg_length"] = (pdf["sepal_length"] + pdf["petal_length"]) / 2
   
           return pa.RecordBatch.from_pandas(pdf)
   
       schema = T.StructType(
           [
               T.StructField("sepal_length", T.DoubleType()),
               T.StructField("sepal_width", T.DoubleType()),
               T.StructField("petal_length", T.DoubleType()),
               T.StructField("petal_width", T.DoubleType()),
               T.StructField("species", T.StringType()),
               T.StructField("avg_length", T.DoubleType()),
           ]
       )
   
       
       test_data = spark.read.parquet("iris.parquet")
       new_data = test_data.mapInArrow(arrow_fun, schema)
       new_data.explain(mode="extended")
   ```
   
   If I try to run it with Comet enabled it will generate the following 
physical plan:
   ```
   PythonMapInArrow arrow_fun(sepal_length#0, sepal_width#1, petal_length#2, 
petal_width#3, species#4)#10, [sepal_length#11, sepal_width#12, 
petal_length#13, petal_width#14,species#15, avg_length#16]
   +- *(1) ColumnarToRow
      +- CometScan parquet 
[sepal_length#0,sepal_width#1,petal_length#2,petal_width#3,species#4] Batched: 
true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 
paths)[file:/var/home/sem/github/tmp/iris.parquet], PartitionFilters: [], 
PushedFilters: [], ReadSchema: 
struct<sepal_length:double,sepal_width:double,petal_length:double,petal_width:double,species:string>
   ```
   
   If I understand it right, the following happens:
   1. Comet read parquet file in columnar form
   2. Comet do conversion of arrow-backed columnar data to row-oriented JVM data
   3. Spark do internal conversion of row-oriented JVM data to arrow-backed 
columnar data
   4. This columnar data is passed to `PythonMapInArrow`
   
   It seems to me that points 2-3 are redundant and the arrow batches that are 
required for `mapInArrow` can be created directly from the Comet arrow-backed 
columns and this operation should be a kind of zero-copy... And actually the 
back conversion from spark columnar batch to comet columnar batch may zero-copy 
too, so in theory Comet does not need to make a fallback to spark in this case, 
right?
   
   ### Describe the potential solution
   
   I do no know an exact solution. It is mostly a question.
   
   ### Additional context
   
   I'm willing to implement it by myself, I'm ready to work on it. But I need a 
guidance and help with an overall design of how it should be done (if it is 
feasible).
   
   The native support of arrow-backed UDFs opens a lot of new cool ways of 
using Comet. I see that it can gain a huge boost for most of ML/MLOps tasks 
that are typically done in Spark via arrow-backed UDFs (`pandas`, `polars`, 
`pyarrow`, etc.).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to