This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git


The following commit(s) were added to refs/heads/main by this push:
     new fe86387  Update packaging and documentation (#8)
fe86387 is described below

commit fe8638739357aaa6bfc646f845ef7cc661070541
Author: Austin Liu <[email protected]>
AuthorDate: Wed Oct 2 21:02:48 2024 +0800

    Update packaging and documentation (#8)
    
    * [Docs] Update ReadMe
    
    Signed-off-by: Austin Liu <[email protected]>
    
    * [Chores] Update packagings
    
    Signed-off-by: Austin Liu <[email protected]>
    
    Format
    
    Signed-off-by: Austin Liu <[email protected]>
    
    * Fix tense
    
    Signed-off-by: Austin Liu <[email protected]>
    
    ---------
    
    Signed-off-by: Austin Liu <[email protected]>
---
 Cargo.lock                                       | 38 ++++++++++++------------
 Cargo.toml                                       | 10 +++----
 README.md                                        | 26 +++++++++-------
 {raysql => datafusion_ray}/__init__.py           |  2 +-
 {raysql => datafusion_ray}/context.py            | 14 ++++-----
 {raysql => datafusion_ray}/main.py               |  9 ++----
 {raysql => datafusion_ray}/ray_utils.py          |  0
 {raysql => datafusion_ray}/tests/test_context.py |  2 +-
 pyproject.toml                                   |  6 ++--
 requirements-in.txt                              |  4 +--
 src/lib.rs                                       |  2 +-
 src/query_stage.rs                               |  2 +-
 12 files changed, 58 insertions(+), 57 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 36b824c..aa6cdff 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1111,6 +1111,25 @@ dependencies = [
  "strum 0.26.3",
 ]
 
+[[package]]
+name = "datafusion_ray"
+version = "0.6.0"
+dependencies = [
+ "datafusion",
+ "datafusion-proto",
+ "datafusion-python",
+ "futures",
+ "glob",
+ "log",
+ "prost 0.12.6",
+ "prost-types 0.12.6",
+ "pyo3",
+ "rustc_version",
+ "tokio",
+ "tonic-build",
+ "uuid",
+]
+
 [[package]]
 name = "digest"
 version = "0.10.7"
@@ -2442,25 +2461,6 @@ dependencies = [
  "getrandom",
 ]
 
-[[package]]
-name = "raysql"
-version = "0.6.0"
-dependencies = [
- "datafusion",
- "datafusion-proto",
- "datafusion-python",
- "futures",
- "glob",
- "log",
- "prost 0.12.6",
- "prost-types 0.12.6",
- "pyo3",
- "rustc_version",
- "tokio",
- "tonic-build",
- "uuid",
-]
-
 [[package]]
 name = "redox_syscall"
 version = "0.5.4"
diff --git a/Cargo.toml b/Cargo.toml
index f1cc533..081520f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,8 +16,8 @@
 # under the License.
 
 [package]
-name = "raysql"
-description = "RaySQL: DataFusion on Ray"
+name = "datafusion_ray"
+description = "DataFusion on Ray"
 homepage = "https://github.com/datafusion-contrib/ray-sql";
 repository = "https://github.com/datafusion-contrib/ray-sql";
 authors = ["Andy Grove <[email protected]>", "Frank Luan 
<[email protected]>"]
@@ -38,7 +38,7 @@ log = "0.4"
 prost = "0.12"
 prost-types = "0.12"
 pyo3 = { version = "0.21", features = ["extension-module", "abi3", 
"abi3-py38"] }
-tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
+tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
 uuid = "1.2"
 
 [build-dependencies]
@@ -46,11 +46,11 @@ rustc_version = "0.4.0"
 tonic-build = { version = "0.8", default-features = false, features = 
["transport", "prost"] }
 
 [lib]
-name = "raysql"
+name = "datafusion_ray"
 crate-type = ["cdylib", "rlib"]
 
 [package.metadata.maturin]
-name = "raysql._raysql_internal"
+name = "datafusion_ray._datafusion_ray_internal"
 
 [profile.release]
 codegen-units = 1
diff --git a/README.md b/README.md
index ebd7803..5aa86e0 100644
--- a/README.md
+++ b/README.md
@@ -17,11 +17,13 @@
   under the License.
 -->
 
-# datafusion-ray: DataFusion on Ray
+# DataFusion on Ray
 
-This is a research project to evaluate performing distributed SQL queries from 
Python, using
+> This was originally a research project donated from  
[ray-sql](https://github.com/datafusion-contrib/ray-sql) to evaluate performing 
distributed SQL queries from Python, using
 [Ray](https://www.ray.io/) and 
[DataFusion](https://github.com/apache/arrow-datafusion).
 
+DataFusion Ray is a distributed SQL query engine powered by the Rust 
implementation of [Apache Arrow](https://arrow.apache.org/), [Apache 
DataFusion](https://datafusion.apache.org/) and [Ray](https://www.ray.io/).
+
 ## Goals
 
 - Demonstrate how easily new systems can be built on top of DataFusion. See 
the [design documentation](./docs/README.md)
@@ -31,7 +33,9 @@ This is a research project to evaluate performing distributed 
SQL queries from P
 
 ## Non Goals
 
-- Build and support a production system.
+- Re-build the cluster scheduling systems like what 
[Ballista](https://datafusion.apache.org/ballista/) did. 
+  - Ballista is extremely complex and utilizing Ray feels like it abstracts 
some of that complexity away.
+  - Datafusion Ray is delegating cluster management to Ray.
 
 ## Example
 
@@ -42,7 +46,7 @@ import os
 import pandas as pd
 import ray
 
-from raysql import RaySqlContext
+from datafusion_ray import RaySqlContext
 
 SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
 
@@ -64,7 +68,7 @@ for record_batch in result_set:
 
 ## Status
 
-- RaySQL can run all queries in the TPC-H benchmark
+- DataFusion Ray can run all queries in the TPC-H benchmark
 
 ## Features
 
@@ -73,29 +77,29 @@ for record_batch in result_set:
 
 ## Limitations
 
-- Requires a shared file system currently
+- Requires a shared file system currently. Check details 
[here](./docs/README.md#distributed-shuffle).
 
 ## Performance
 
-This chart shows the performance of RaySQL compared to Apache Spark for
+This chart shows the performance of DataFusion Ray compared to Apache Spark for
 [SQLBench-H](https://sqlbenchmarks.io/sqlbench-h/) at a very small data set 
(10GB), running on a desktop (Threadripper
-with 24 physical cores). Both RaySQL and Spark are configured with 24 
executors.
+with 24 physical cores). Both DataFusion Ray and Spark are configured with 24 
executors.
 
 ### Overall Time
 
-RaySQL is ~1.9x faster overall for this scale factor and environment with 
disk-based shuffle.
+DataFusion Ray is ~1.9x faster overall for this scale factor and environment 
with disk-based shuffle.
 
 ![SQLBench-H Total](./docs/sqlbench-h-total.png)
 
 ### Per Query Time
 
-Spark is much faster on some queries, likely due to broadcast exchanges, which 
RaySQL hasn't implemented yet.
+Spark is much faster on some queries, likely due to broadcast exchanges, which 
DataFusion Ray hasn't implemented yet.
 
 ![SQLBench-H Per Query](./docs/sqlbench-h-per-query.png)
 
 ### Performance Plan
 
-I'm planning on experimenting with the following changes to improve 
performance:
+Plans on experimenting with the following changes to improve performance:
 
 - Make better use of Ray futures to run more tasks in parallel
 - Use Ray object store for shuffle data transfer to reduce disk I/O cost
diff --git a/raysql/__init__.py b/datafusion_ray/__init__.py
similarity index 96%
rename from raysql/__init__.py
rename to datafusion_ray/__init__.py
index b608318..c8bae17 100644
--- a/raysql/__init__.py
+++ b/datafusion_ray/__init__.py
@@ -20,7 +20,7 @@ try:
 except ImportError:
     import importlib_metadata
 
-from ._raysql_internal import (
+from ._datafusion_ray_internal import (
     Context,
     ExecutionGraph,
     QueryStage,
diff --git a/raysql/context.py b/datafusion_ray/context.py
similarity index 95%
rename from raysql/context.py
rename to datafusion_ray/context.py
index c44de4c..bd3aee3 100644
--- a/raysql/context.py
+++ b/datafusion_ray/context.py
@@ -23,8 +23,8 @@ from typing import Iterable
 import pyarrow as pa
 import ray
 
-import raysql
-from raysql import Context, ExecutionGraph, QueryStage
+import datafusion_ray
+from datafusion_ray import Context, ExecutionGraph, QueryStage
 from typing import List
 
 def schedule_execution(
@@ -73,7 +73,7 @@ def schedule_execution(
         return ids, futures
 
     # schedule the actual execution workers
-    plan_bytes = raysql.serialize_execution_plan(stage.get_execution_plan())
+    plan_bytes = 
datafusion_ray.serialize_execution_plan(stage.get_execution_plan())
     futures = []
     opt = {}
     opt["resources"] = {"worker": 1e-3}
@@ -153,7 +153,7 @@ def execute_query_stage(
         ray.get([f for _, lst in child_outputs for f in lst])
 
     # schedule the actual execution workers
-    plan_bytes = raysql.serialize_execution_plan(stage.get_execution_plan())
+    plan_bytes = 
datafusion_ray.serialize_execution_plan(stage.get_execution_plan())
     futures = []
     opt = {}
     opt["resources"] = {"worker": 1e-3}
@@ -179,7 +179,7 @@ def execute_query_partition(
     *input_partitions: list[pa.RecordBatch],
 ) -> Iterable[pa.RecordBatch]:
     start_time = time.time()
-    plan = raysql.deserialize_execution_plan(plan_bytes)
+    plan = datafusion_ray.deserialize_execution_plan(plan_bytes)
     # print(
     #     "Worker executing plan {} partition #{} with shuffle inputs 
{}".format(
     #         plan.display(),
@@ -193,7 +193,7 @@ def execute_query_partition(
     # This is delegating to DataFusion for execution, but this would be a good 
place
     # to plug in other execution engines by translating the plan into another 
engine's plan
     # (perhaps via Substrait, once DataFusion supports converting a physical 
plan to Substrait)
-    ret = raysql.execute_partition(plan, part, partitions)
+    ret = datafusion_ray.execute_partition(plan, part, partitions)
     duration = time.time() - start_time
     event = {
         "cat": f"{stage_id}-{part}",
@@ -238,7 +238,7 @@ class RaySqlContext:
         else:
             # serialize the query stages and store in Ray object store
             query_stages = [
-                raysql.serialize_execution_plan(
+                datafusion_ray.serialize_execution_plan(
                     graph.get_query_stage(i).get_execution_plan()
                 )
                 for i in range(final_stage_id + 1)
diff --git a/raysql/main.py b/datafusion_ray/main.py
similarity index 95%
rename from raysql/main.py
rename to datafusion_ray/main.py
index e9fe382..9e9b97a 100644
--- a/raysql/main.py
+++ b/datafusion_ray/main.py
@@ -20,18 +20,15 @@ import os
 
 from pyarrow import csv as pacsv
 import ray
-from raysql import RaySqlContext
+from datafusion_ray import RaySqlContext
 
 NUM_CPUS_PER_WORKER = 8
 
-SF = 10
+SF = 1
 DATA_DIR = f"/mnt/data0/tpch/sf{SF}-parquet"
 SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
 QUERIES_DIR = os.path.join(SCRIPT_DIR, f"../sqlbench-h/queries/sf={SF}")
 RESULTS_DIR = f"results-sf{SF}"
-TRUTH_DIR = (
-    
"/home/ubuntu/raysort/ray-sql/sqlbench-runners/spark/{RESULTS_DIR}/{RESULTS_DIR}"
-)
 
 
 def setup_context(use_ray_shuffle: bool, num_workers: int = 2) -> 
RaySqlContext:
@@ -104,7 +101,7 @@ def compare(q: int):
 
 
 def tpch_bench():
-    ray.init("auto")
+    ray.init(resources={"worker": 1})
     num_workers = int(ray.cluster_resources().get("worker", 1)) * 
NUM_CPUS_PER_WORKER
     use_ray_shuffle = False
     ctx = setup_context(use_ray_shuffle, num_workers)
diff --git a/raysql/ray_utils.py b/datafusion_ray/ray_utils.py
similarity index 100%
rename from raysql/ray_utils.py
rename to datafusion_ray/ray_utils.py
diff --git a/raysql/tests/test_context.py b/datafusion_ray/tests/test_context.py
similarity index 96%
rename from raysql/tests/test_context.py
rename to datafusion_ray/tests/test_context.py
index e1cc97c..d138620 100644
--- a/raysql/tests/test_context.py
+++ b/datafusion_ray/tests/test_context.py
@@ -16,7 +16,7 @@
 # under the License.
 
 import pytest
-from raysql import Context
+from datafusion_ray import Context
 
 def test():
     ctx = Context(1, False)
diff --git a/pyproject.toml b/pyproject.toml
index 432bd4f..7a53bcd 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -16,11 +16,11 @@
 # under the License.
 
 [build-system]
-requires = ["maturin>=0.14,<0.15"]
+requires = ["maturin>=0.14,<1.7.4"]
 build-backend = "maturin"
 
 [project]
-name = "raysql"
+name = "datafusion-ray"
 requires-python = ">=3.7"
 classifiers = [
     "Programming Language :: Rust",
@@ -30,4 +30,4 @@ classifiers = [
 
 
 [tool.maturin]
-module-name = "raysql._raysql_internal"
+module-name = "datafusion_ray._datafusion_ray_internal"
diff --git a/requirements-in.txt b/requirements-in.txt
index b0ba082..166b032 100644
--- a/requirements-in.txt
+++ b/requirements-in.txt
@@ -1,11 +1,11 @@
 black
 flake8
 isort
-maturin[patchelf]
+maturin
 mypy
 numpy
 pyarrow
 pytest
-ray==2.3.0
+ray==2.37.0
 toml
 importlib_metadata; python_version < "3.8"
diff --git a/src/lib.rs b/src/lib.rs
index df53b52..4436ac4 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -31,7 +31,7 @@ pub mod utils;
 
 /// A Python module implemented in Rust.
 #[pymodule]
-fn _raysql_internal(_py: Python, m: &PyModule) -> PyResult<()> {
+fn _datafusion_ray_internal(_py: Python, m: &PyModule) -> PyResult<()> {
     // register classes that can be created directly from Python code
     m.add_class::<context::PyContext>()?;
     m.add_class::<planner::PyExecutionGraph>()?;
diff --git a/src/query_stage.rs b/src/query_stage.rs
index 1ada967..2a2f7af 100644
--- a/src/query_stage.rs
+++ b/src/query_stage.rs
@@ -24,7 +24,7 @@ use datafusion_python::physical_plan::PyExecutionPlan;
 use pyo3::prelude::*;
 use std::sync::Arc;
 
-#[pyclass(name = "QueryStage", module = "raysql", subclass)]
+#[pyclass(name = "QueryStage", module = "datafusion_ray", subclass)]
 pub struct PyQueryStage {
     stage: Arc<QueryStage>,
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to