This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git
The following commit(s) were added to refs/heads/main by this push:
new fe86387 Update packaging and documentation (#8)
fe86387 is described below
commit fe8638739357aaa6bfc646f845ef7cc661070541
Author: Austin Liu <[email protected]>
AuthorDate: Wed Oct 2 21:02:48 2024 +0800
Update packaging and documentation (#8)
* [Docs] Update ReadMe
Signed-off-by: Austin Liu <[email protected]>
* [Chores] Update packagings
Signed-off-by: Austin Liu <[email protected]>
Format
Signed-off-by: Austin Liu <[email protected]>
* Fix tense
Signed-off-by: Austin Liu <[email protected]>
---------
Signed-off-by: Austin Liu <[email protected]>
---
Cargo.lock | 38 ++++++++++++------------
Cargo.toml | 10 +++----
README.md | 26 +++++++++-------
{raysql => datafusion_ray}/__init__.py | 2 +-
{raysql => datafusion_ray}/context.py | 14 ++++-----
{raysql => datafusion_ray}/main.py | 9 ++----
{raysql => datafusion_ray}/ray_utils.py | 0
{raysql => datafusion_ray}/tests/test_context.py | 2 +-
pyproject.toml | 6 ++--
requirements-in.txt | 4 +--
src/lib.rs | 2 +-
src/query_stage.rs | 2 +-
12 files changed, 58 insertions(+), 57 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 36b824c..aa6cdff 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1111,6 +1111,25 @@ dependencies = [
"strum 0.26.3",
]
+[[package]]
+name = "datafusion_ray"
+version = "0.6.0"
+dependencies = [
+ "datafusion",
+ "datafusion-proto",
+ "datafusion-python",
+ "futures",
+ "glob",
+ "log",
+ "prost 0.12.6",
+ "prost-types 0.12.6",
+ "pyo3",
+ "rustc_version",
+ "tokio",
+ "tonic-build",
+ "uuid",
+]
+
[[package]]
name = "digest"
version = "0.10.7"
@@ -2442,25 +2461,6 @@ dependencies = [
"getrandom",
]
-[[package]]
-name = "raysql"
-version = "0.6.0"
-dependencies = [
- "datafusion",
- "datafusion-proto",
- "datafusion-python",
- "futures",
- "glob",
- "log",
- "prost 0.12.6",
- "prost-types 0.12.6",
- "pyo3",
- "rustc_version",
- "tokio",
- "tonic-build",
- "uuid",
-]
-
[[package]]
name = "redox_syscall"
version = "0.5.4"
diff --git a/Cargo.toml b/Cargo.toml
index f1cc533..081520f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,8 +16,8 @@
# under the License.
[package]
-name = "raysql"
-description = "RaySQL: DataFusion on Ray"
+name = "datafusion_ray"
+description = "DataFusion on Ray"
homepage = "https://github.com/datafusion-contrib/ray-sql"
repository = "https://github.com/datafusion-contrib/ray-sql"
authors = ["Andy Grove <[email protected]>", "Frank Luan
<[email protected]>"]
@@ -38,7 +38,7 @@ log = "0.4"
prost = "0.12"
prost-types = "0.12"
pyo3 = { version = "0.21", features = ["extension-module", "abi3",
"abi3-py38"] }
-tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread",
"sync"] }
+tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread",
"sync"] }
uuid = "1.2"
[build-dependencies]
@@ -46,11 +46,11 @@ rustc_version = "0.4.0"
tonic-build = { version = "0.8", default-features = false, features =
["transport", "prost"] }
[lib]
-name = "raysql"
+name = "datafusion_ray"
crate-type = ["cdylib", "rlib"]
[package.metadata.maturin]
-name = "raysql._raysql_internal"
+name = "datafusion_ray._datafusion_ray_internal"
[profile.release]
codegen-units = 1
diff --git a/README.md b/README.md
index ebd7803..5aa86e0 100644
--- a/README.md
+++ b/README.md
@@ -17,11 +17,13 @@
under the License.
-->
-# datafusion-ray: DataFusion on Ray
+# DataFusion on Ray
-This is a research project to evaluate performing distributed SQL queries from
Python, using
+> This was originally a research project donated from
[ray-sql](https://github.com/datafusion-contrib/ray-sql) to evaluate performing
distributed SQL queries from Python, using
[Ray](https://www.ray.io/) and
[DataFusion](https://github.com/apache/arrow-datafusion).
+DataFusion Ray is a distributed SQL query engine powered by the Rust
implementation of [Apache Arrow](https://arrow.apache.org/), [Apache
DataFusion](https://datafusion.apache.org/) and [Ray](https://www.ray.io/).
+
## Goals
- Demonstrate how easily new systems can be built on top of DataFusion. See
the [design documentation](./docs/README.md)
@@ -31,7 +33,9 @@ This is a research project to evaluate performing distributed
SQL queries from P
## Non Goals
-- Build and support a production system.
+- Re-build the cluster scheduling systems like what
[Ballista](https://datafusion.apache.org/ballista/) did.
+ - Ballista is extremely complex and utilizing Ray feels like it abstracts
some of that complexity away.
+ - Datafusion Ray is delegating cluster management to Ray.
## Example
@@ -42,7 +46,7 @@ import os
import pandas as pd
import ray
-from raysql import RaySqlContext
+from datafusion_ray import RaySqlContext
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@@ -64,7 +68,7 @@ for record_batch in result_set:
## Status
-- RaySQL can run all queries in the TPC-H benchmark
+- DataFusion Ray can run all queries in the TPC-H benchmark
## Features
@@ -73,29 +77,29 @@ for record_batch in result_set:
## Limitations
-- Requires a shared file system currently
+- Requires a shared file system currently. Check details
[here](./docs/README.md#distributed-shuffle).
## Performance
-This chart shows the performance of RaySQL compared to Apache Spark for
+This chart shows the performance of DataFusion Ray compared to Apache Spark for
[SQLBench-H](https://sqlbenchmarks.io/sqlbench-h/) at a very small data set
(10GB), running on a desktop (Threadripper
-with 24 physical cores). Both RaySQL and Spark are configured with 24
executors.
+with 24 physical cores). Both DataFusion Ray and Spark are configured with 24
executors.
### Overall Time
-RaySQL is ~1.9x faster overall for this scale factor and environment with
disk-based shuffle.
+DataFusion Ray is ~1.9x faster overall for this scale factor and environment
with disk-based shuffle.

### Per Query Time
-Spark is much faster on some queries, likely due to broadcast exchanges, which
RaySQL hasn't implemented yet.
+Spark is much faster on some queries, likely due to broadcast exchanges, which
DataFusion Ray hasn't implemented yet.

### Performance Plan
-I'm planning on experimenting with the following changes to improve
performance:
+Plans on experimenting with the following changes to improve performance:
- Make better use of Ray futures to run more tasks in parallel
- Use Ray object store for shuffle data transfer to reduce disk I/O cost
diff --git a/raysql/__init__.py b/datafusion_ray/__init__.py
similarity index 96%
rename from raysql/__init__.py
rename to datafusion_ray/__init__.py
index b608318..c8bae17 100644
--- a/raysql/__init__.py
+++ b/datafusion_ray/__init__.py
@@ -20,7 +20,7 @@ try:
except ImportError:
import importlib_metadata
-from ._raysql_internal import (
+from ._datafusion_ray_internal import (
Context,
ExecutionGraph,
QueryStage,
diff --git a/raysql/context.py b/datafusion_ray/context.py
similarity index 95%
rename from raysql/context.py
rename to datafusion_ray/context.py
index c44de4c..bd3aee3 100644
--- a/raysql/context.py
+++ b/datafusion_ray/context.py
@@ -23,8 +23,8 @@ from typing import Iterable
import pyarrow as pa
import ray
-import raysql
-from raysql import Context, ExecutionGraph, QueryStage
+import datafusion_ray
+from datafusion_ray import Context, ExecutionGraph, QueryStage
from typing import List
def schedule_execution(
@@ -73,7 +73,7 @@ def schedule_execution(
return ids, futures
# schedule the actual execution workers
- plan_bytes = raysql.serialize_execution_plan(stage.get_execution_plan())
+ plan_bytes =
datafusion_ray.serialize_execution_plan(stage.get_execution_plan())
futures = []
opt = {}
opt["resources"] = {"worker": 1e-3}
@@ -153,7 +153,7 @@ def execute_query_stage(
ray.get([f for _, lst in child_outputs for f in lst])
# schedule the actual execution workers
- plan_bytes = raysql.serialize_execution_plan(stage.get_execution_plan())
+ plan_bytes =
datafusion_ray.serialize_execution_plan(stage.get_execution_plan())
futures = []
opt = {}
opt["resources"] = {"worker": 1e-3}
@@ -179,7 +179,7 @@ def execute_query_partition(
*input_partitions: list[pa.RecordBatch],
) -> Iterable[pa.RecordBatch]:
start_time = time.time()
- plan = raysql.deserialize_execution_plan(plan_bytes)
+ plan = datafusion_ray.deserialize_execution_plan(plan_bytes)
# print(
# "Worker executing plan {} partition #{} with shuffle inputs
{}".format(
# plan.display(),
@@ -193,7 +193,7 @@ def execute_query_partition(
# This is delegating to DataFusion for execution, but this would be a good
place
# to plug in other execution engines by translating the plan into another
engine's plan
# (perhaps via Substrait, once DataFusion supports converting a physical
plan to Substrait)
- ret = raysql.execute_partition(plan, part, partitions)
+ ret = datafusion_ray.execute_partition(plan, part, partitions)
duration = time.time() - start_time
event = {
"cat": f"{stage_id}-{part}",
@@ -238,7 +238,7 @@ class RaySqlContext:
else:
# serialize the query stages and store in Ray object store
query_stages = [
- raysql.serialize_execution_plan(
+ datafusion_ray.serialize_execution_plan(
graph.get_query_stage(i).get_execution_plan()
)
for i in range(final_stage_id + 1)
diff --git a/raysql/main.py b/datafusion_ray/main.py
similarity index 95%
rename from raysql/main.py
rename to datafusion_ray/main.py
index e9fe382..9e9b97a 100644
--- a/raysql/main.py
+++ b/datafusion_ray/main.py
@@ -20,18 +20,15 @@ import os
from pyarrow import csv as pacsv
import ray
-from raysql import RaySqlContext
+from datafusion_ray import RaySqlContext
NUM_CPUS_PER_WORKER = 8
-SF = 10
+SF = 1
DATA_DIR = f"/mnt/data0/tpch/sf{SF}-parquet"
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
QUERIES_DIR = os.path.join(SCRIPT_DIR, f"../sqlbench-h/queries/sf={SF}")
RESULTS_DIR = f"results-sf{SF}"
-TRUTH_DIR = (
-
"/home/ubuntu/raysort/ray-sql/sqlbench-runners/spark/{RESULTS_DIR}/{RESULTS_DIR}"
-)
def setup_context(use_ray_shuffle: bool, num_workers: int = 2) ->
RaySqlContext:
@@ -104,7 +101,7 @@ def compare(q: int):
def tpch_bench():
- ray.init("auto")
+ ray.init(resources={"worker": 1})
num_workers = int(ray.cluster_resources().get("worker", 1)) *
NUM_CPUS_PER_WORKER
use_ray_shuffle = False
ctx = setup_context(use_ray_shuffle, num_workers)
diff --git a/raysql/ray_utils.py b/datafusion_ray/ray_utils.py
similarity index 100%
rename from raysql/ray_utils.py
rename to datafusion_ray/ray_utils.py
diff --git a/raysql/tests/test_context.py b/datafusion_ray/tests/test_context.py
similarity index 96%
rename from raysql/tests/test_context.py
rename to datafusion_ray/tests/test_context.py
index e1cc97c..d138620 100644
--- a/raysql/tests/test_context.py
+++ b/datafusion_ray/tests/test_context.py
@@ -16,7 +16,7 @@
# under the License.
import pytest
-from raysql import Context
+from datafusion_ray import Context
def test():
ctx = Context(1, False)
diff --git a/pyproject.toml b/pyproject.toml
index 432bd4f..7a53bcd 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -16,11 +16,11 @@
# under the License.
[build-system]
-requires = ["maturin>=0.14,<0.15"]
+requires = ["maturin>=0.14,<1.7.4"]
build-backend = "maturin"
[project]
-name = "raysql"
+name = "datafusion-ray"
requires-python = ">=3.7"
classifiers = [
"Programming Language :: Rust",
@@ -30,4 +30,4 @@ classifiers = [
[tool.maturin]
-module-name = "raysql._raysql_internal"
+module-name = "datafusion_ray._datafusion_ray_internal"
diff --git a/requirements-in.txt b/requirements-in.txt
index b0ba082..166b032 100644
--- a/requirements-in.txt
+++ b/requirements-in.txt
@@ -1,11 +1,11 @@
black
flake8
isort
-maturin[patchelf]
+maturin
mypy
numpy
pyarrow
pytest
-ray==2.3.0
+ray==2.37.0
toml
importlib_metadata; python_version < "3.8"
diff --git a/src/lib.rs b/src/lib.rs
index df53b52..4436ac4 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -31,7 +31,7 @@ pub mod utils;
/// A Python module implemented in Rust.
#[pymodule]
-fn _raysql_internal(_py: Python, m: &PyModule) -> PyResult<()> {
+fn _datafusion_ray_internal(_py: Python, m: &PyModule) -> PyResult<()> {
// register classes that can be created directly from Python code
m.add_class::<context::PyContext>()?;
m.add_class::<planner::PyExecutionGraph>()?;
diff --git a/src/query_stage.rs b/src/query_stage.rs
index 1ada967..2a2f7af 100644
--- a/src/query_stage.rs
+++ b/src/query_stage.rs
@@ -24,7 +24,7 @@ use datafusion_python::physical_plan::PyExecutionPlan;
use pyo3::prelude::*;
use std::sync::Arc;
-#[pyclass(name = "QueryStage", module = "raysql", subclass)]
+#[pyclass(name = "QueryStage", module = "datafusion_ray", subclass)]
pub struct PyQueryStage {
stage: Arc<QueryStage>,
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]