austin362667 opened a new issue, #37: URL: https://github.com/apache/datafusion-ray/issues/37
**Describe the bug** [DataFusion Ray](https://github.com/apache/datafusion-ray) installs [DataFusion Python](https://github.com/apache/datafusion-python) as a dependency after the merge of PR [Remove Session Context](https://github.com/apache/datafusion-ray/commit/b792eb0aaaa7704de5156bb87bc406d831be8fbf#diff-3165d0fca35aeb3c7c29f472edf128187ee5107cad705b253eafe5a5acb4bc35R30). I'm encountering the following bug, and I suspect it's a misused of DataFusion Python bindings exposed by PyO3 within DataFusion Ray . ``` Traceback (most recent call last): File "/tmp/ray/session_2024-10-21_04-23-14_317497_1/runtime_resources/working_dir_files/_ray_pkg_dcc19145fca26cc6/tips.py", line 55, in <module> df_ctx.create_dataframe([ray_results]).show() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.11/site-packages/datafusion/context.py", line 554, in create_dataframe return DataFrame(self.ctx.create_dataframe(partitions, name, schema)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: argument 'partitions': 'RecordBatch' object cannot be converted to 'PyList' ``` **To Reproduce** In [DataFusion Ray](https://github.com/apache/datafusion-ray), [set up the cluster](https://github.com/apache/datafusion-ray/pull/23#pullrequestreview-2352261550) and register the Python script to the Ray cluster: ``` export RAY_ADDRESS="http://127.0.0.1:8265" ray job submit --working-dir ./examples/ -- python3 tips.py ``` **Expected behavior** Should return successfully ``` Job submission server address: http://127.0.0.1:8265 2024-10-07 22:59:35,002 INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_3dd815f5033d4050.zip. 2024-10-07 22:59:35,003 INFO packaging.py:531 -- Creating a file package for local directory './examples/'. ------------------------------------------------------- Job 'raysubmit_XeVwVxmEhvpayxXW' submitted successfully ------------------------------------------------------- Next steps Query the logs of the job: ray job logs raysubmit_XeVwVxmEhvpayxXW Query the status of the job: ray job status raysubmit_XeVwVxmEhvpayxXW Request the job to be stopped: ray job stop raysubmit_XeVwVxmEhvpayxXW Tailing logs until the job exits (disable with --no-wait): 2024-10-07 07:59:35,419 INFO job_manager.py:529 -- Runtime env is setting up. 2024-10-07 07:59:36,933 INFO worker.py:1474 -- Using address 192.168.194.51:6379 set in the environment variable RAY_ADDRESS 2024-10-07 07:59:36,933 INFO worker.py:1614 -- Connecting to existing Ray cluster at address: 192.168.194.51:6379... 2024-10-07 07:59:36,938 INFO worker.py:1790 -- Connected to Ray cluster. View the dashboard at http://192.168.194.51:8265 Planning select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker Query stage #0: RayShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "sex", index: 0 }, Column { name: "smoker", index: 1 }], 2)) AggregateExec: mode=Partial, gby=[sex@2 as sex, smoker@3 as smoker], aggr=[avg(tips.tip / tips.total_bill)] ParquetExec: file_groups={1 group: [[tmp/ray/session_2024-10-07_07-57-04_914630_1/runtime_resources/working_dir_files/_ray_pkg_3dd815f5033d4050/tips.parquet]]}, projection=[total_bill, tip, sex, smoker] Query stage #1: CoalescePartitionsExec ProjectionExec: expr=[sex@0 as sex, smoker@1 as smoker, avg(tips.tip / tips.total_bill)@2 as tip_pct] AggregateExec: mode=FinalPartitioned, gby=[sex@0 as sex, smoker@1 as smoker], aggr=[avg(tips.tip / tips.total_bill)] CoalesceBatchesExec: target_batch_size=16384 RayShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { name: "sex", index: 0 }, Column { name: "smoker", index: 1 }], 2)) Scheduling query stage #0 with 1 input partitions and 2 output partitions Forcing reduce stage concurrency from 2 to 1 Scheduling query stage #1 with 1 input partitions and 1 output partitions (execute_query_partition pid=943) RayShuffleWriterExec[stage=0].execute(input_partition=0) (execute_query_partition pid=943) RayShuffleWriterExec[stage=0] Finished writing shuffle partition 0. Batches: 1. Rows: 2. Bytes: 720. (execute_query_partition pid=943) RayShuffleWriterExec[stage=0] Finished writing shuffle partition 1. Batches: 1. Rows: 2. Bytes: 720. (execute_query_partition pid=943) {"cat": "0-0", "name": "0-0", "pid": "192.168.194.51", "tid": 943, "ts": 1728313178020260, "dur": 3577, "ph": "X"},:task_name:execute_query_partition (execute_query_partition pid=943) RayShuffleReaderExec[stage=0].execute(input_partition=1) with 1 shuffle inputs (execute_query_partition pid=943) RayShuffleReaderExec[stage=0].execute(input_partition=0) with 1 shuffle inputs (execute_query_partition pid=943) {"cat": "1-0", "name": "1-0", "pid": "192.168.194.51", "tid": 943, "ts": 1728313178027011, "dur": 769, "ph": "X"}, sex smoker tip_pct 0 Male No 0.160669 1 Female Yes 0.182150 sex smoker tip_pct 0 Female No 0.156921 1 Male Yes 0.152771 ------------------------------------------ Job 'raysubmit_XeVwVxmEhvpayxXW' succeeded ------------------------------------------ ``` **Additional context** Add any other context about the problem here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
