SemyonSinchenko commented on issue #957:
URL: 
https://github.com/apache/datafusion-comet/issues/957#issuecomment-2370555748

   I took a look on the spark code and that is what I found:
   1. Conversion from rows to arrow batches is done in 
`org.apache.spark.sql.execution.python.BasicPythonArrowInput`
   2. Back conversion from arrow batches to rows is done 
`org.apache.spark.sql.execution.python.BasicPythonArrowOutput`
   
   Both are `private[python]` in spark. As I can realize 
`BaseArrowPythonRunner` expects `InternalRow` as input, converts it to arrow, 
and returns `org.apache.spark.sql.vectorized.ColumnarBatch`.
   
   What am I thinking now is about implementing `CometArrowPythonRunner extends 
BasePythonRunner[CometColumnarBatch, CometColumnarBatch]`. `CometColumnarBatch` 
here is just a Scala wrapper over comet memory. There are a lot of configs that 
are handled by Spark, like `spark.pyspark.driver.python` and 
`spark.pyspark.python`. These configs define which python interpreter is used 
and also what is included to `PYTHONPATH`: that allows users to add own 
dependencies to python vectorized UDFs. In the case when execution will be in 
Rust, all these configs should be handled by Comet including managing python 
virtual environments, collecting python metrics, etc.
   
   So, I see a possible solution the following:
   - Implement `CometColumnarBatch extends InternalRow` that wraps comet data 
without copy
   - Add an ability for child in the plan to request the `CometColumnarBatch` 
from parent and add an ability for comet nodes to work with `CometColumnarBatch`
   - Implement `CometArrowPythonRunner extends 
BasePythonRunner[CometColumnarBatch, CometColumnarBatch]` with the only 
difference in how input and output are processed
   - Implement `MapInBatchExecComet` that implements `doExecute` and internally 
creates a `CometArrowPythonRunner` instead of `ArrowPythonRunner`
   - Add a rule to comet plugin that replace `MapInBatchExec` by 
`MapInBatchExecComet` if possible and without fallback to spark
   
   It seems to me that re-using how spark handle python UDFs would be easier 
than implementing it from scratch using datafusion. But I'm not 100% sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to