This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git
The following commit(s) were added to refs/heads/main by this push:
new 8498b19 Repo Housekeeping, updating naming remove unused files (#70)
8498b19 is described below
commit 8498b1907c66a7f59e81ddc5d47f1eece9200ad1
Author: robtandy <[email protected]>
AuthorDate: Sun Mar 2 15:43:41 2025 -0500
Repo Housekeeping, updating naming remove unused files (#70)
---
.github/workflows/main.yml | 105 ++++++++++
.github/workflows/rust.yml | 60 ------
Cargo.toml | 2 +-
datafusion_ray/__init__.py | 6 +-
datafusion_ray/core.py | 271 ++++++++++++++-----------
datafusion_ray/util.py | 4 +
examples/ray_stage.py | 71 -------
examples/tips.py | 4 +-
pyproject.toml | 4 +-
requirements-in.txt | 12 --
src/codec.rs | 18 +-
src/context.rs | 12 +-
src/dataframe.rs | 35 ++--
src/lib.rs | 14 +-
src/physical.rs | 7 +-
src/{stage_service.rs => processor_service.rs} | 35 ++--
src/proto/datafusion_ray.proto | 2 +-
src/proto/generated/protobuf.rs | 2 +-
src/{ray_stage.rs => stage.rs} | 10 +-
src/{ray_stage_reader.rs => stage_reader.rs} | 10 +-
src/util.rs | 10 +-
testdata/tpch/.gitignore | 0
{testdata => tpch}/queries/q1.sql | 0
{testdata => tpch}/queries/q10.sql | 0
{testdata => tpch}/queries/q11.sql | 0
{testdata => tpch}/queries/q12.sql | 0
{testdata => tpch}/queries/q13.sql | 0
{testdata => tpch}/queries/q14.sql | 0
{testdata => tpch}/queries/q15.sql | 0
{testdata => tpch}/queries/q16.sql | 0
{testdata => tpch}/queries/q17.sql | 0
{testdata => tpch}/queries/q18.sql | 0
{testdata => tpch}/queries/q19.sql | 0
{testdata => tpch}/queries/q2.sql | 0
{testdata => tpch}/queries/q20.sql | 0
{testdata => tpch}/queries/q21.sql | 0
{testdata => tpch}/queries/q22.sql | 0
{testdata => tpch}/queries/q3.sql | 0
{testdata => tpch}/queries/q4.sql | 0
{testdata => tpch}/queries/q5.sql | 0
{testdata => tpch}/queries/q6.sql | 0
{testdata => tpch}/queries/q7.sql | 0
{testdata => tpch}/queries/q8.sql | 0
{testdata => tpch}/queries/q9.sql | 0
tpch/requirements.txt | 7 +-
tpch/tpc.py | 149 --------------
tpch/tpcbench.py | 22 +-
47 files changed, 355 insertions(+), 517 deletions(-)
diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
new file mode 100644
index 0000000..bcbe6e2
--- /dev/null
+++ b/.github/workflows/main.yml
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http:/www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: main
+on:
+ push:
+ branches: [main]
+ pull_request:
+ branches: [main]
+
+#concurrency:
+# group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{
github.workflow }}
+#cancel-in-progress: true
+
+jobs:
+ test-matrix:
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ python-version:
+ #- "3.10"
+ #- "3.11"
+ - "3.12"
+ toolchain:
+ - "stable"
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Setup Rust Toolchain
+ uses: dtolnay/rust-toolchain@stable
+ id: rust-toolchain
+ with:
+ components: clippy,rustfmt
+
+ - name: Install Protoc
+ uses: arduino/setup-protoc@v3
+ with:
+ version: "27.4"
+ repo-token: ${{ secrets.GITHUB_TOKEN }}
+
+ - name: Cache Cargo
+ uses: actions/cache@v4
+ with:
+ path: ~/.cargo
+ key: cargo-cache-${{ steps.rust-toolchain.outputs.cachekey }}-${{
hashFiles('Cargo.lock') }}
+
+ - name: Install dependencies and build
+ uses: astral-sh/setup-uv@v5
+ with:
+ python-version: ${{ matrix.python-version }}
+ enable-cache: true
+
+ - name: Create virtual env
+ run: |
+ uv venv
+
+ - name: Cache the generated dataset
+ id: cache-tpch-dataset
+ uses: actions/cache@v4
+ with:
+ path: ./testdata/tpch
+ key: tpch-data
+
+ - name: create the dataset
+ if: ${{ steps.cache-tpch-dataset.outputs.cache-hit != 'true' }}
+ run: |
+ uv add duckdb
+ uv run python tpch/make_data.py 1 testdata/tpch/
+
+ - name: build and install datafusion-ray
+ env:
+ RUST_BACKTRACE: 1
+ run: |
+ uv add 'ray[default]'
+ uv run --no-project maturin develop --uv
+
+ - name: validate tpch
+ env:
+ DATAFUSION_RAY_LOG_LEVEL: debug
+ RAY_COLOR_PREFIX: 1
+ RAY_DEDUP_LOGS: 0
+ run: |
+ uv run python tpch/tpcbench.py \
+ --data='file:///${{ github.workspace }}/testdata/tpch/' \
+ --concurrency 3 \
+ --partitions-per-worker 2 \
+ --batch-size=8192 \
+ --worker-pool-min=20 \
+ --validate
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
deleted file mode 100644
index 0c6a2b8..0000000
--- a/.github/workflows/rust.yml
+++ /dev/null
@@ -1,60 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http:/www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-name: Rust
-
-on:
- push:
- pull_request:
-
-env:
- CARGO_TERM_COLOR: always
- PYTHON_VERSION: 3.9
- TPCH_SCALING_FACTOR: "1"
- TPCH_TEST_PARTITIONS: "1"
- TPCH_DATA_PATH: "data"
-
-jobs:
- build:
- runs-on: ubuntu-latest
-
- steps:
- - uses: actions/checkout@v4
- - name: Install protobuf compiler
- shell: bash
- run: sudo apt-get install protobuf-compiler
- - name: Build Rust code
- run: cargo build --verbose
- - name: Set up Python
- uses: actions/setup-python@v5
- with:
- python-version: ${{ env.PYTHON_VERSION }}
- - name: Install test dependencies
- run: |
- python -m pip install --upgrade pip
- pip install -r tpch/requirements.txt
-# - name: Generate test data
-# run: |
-# ./scripts/gen-test-data.sh
- - name: Run Rust tests
- run: cargo test --verbose
-# - name: Run Python tests
-# run: |
-# python -m venv venv
-# source venv/bin/activate
-# pip install -r requirements-in.txt
-# maturin develop
-# python -m pytest
diff --git a/Cargo.toml b/Cargo.toml
index 0f37c09..0638fea 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,7 +22,7 @@ homepage = "https://github.com/apache/datafusion-ray"
repository = "https://github.com/apache/datafusion-ray"
authors = ["Apache DataFusion <[email protected]>"]
version = "0.1.0"
-edition = "2021"
+edition = "2024"
readme = "README.md"
license = "Apache-2.0"
rust-version = "1.85"
diff --git a/datafusion_ray/__init__.py b/datafusion_ray/__init__.py
index 365b56a..4dfe339 100644
--- a/datafusion_ray/__init__.py
+++ b/datafusion_ray/__init__.py
@@ -20,6 +20,10 @@ try:
except ImportError:
import importlib_metadata
-from .core import RayContext, exec_sql_on_tables, prettify, runtime_env,
RayStagePool
+from .core import DFRayContext, df_ray_runtime_env
+
+from . import util
+
+__all__ = ["DFRayContext", "df_ray_runtime_env", "util"]
__version__ = importlib_metadata.version(__name__)
diff --git a/datafusion_ray/core.py b/datafusion_ray/core.py
index 3e4cfe8..70955bc 100644
--- a/datafusion_ray/core.py
+++ b/datafusion_ray/core.py
@@ -23,15 +23,13 @@ import os
import pyarrow as pa
import asyncio
import ray
-import json
import time
from .friendly import new_friendly_name
from datafusion_ray._datafusion_ray_internal import (
- RayContext as RayContextInternal,
- RayDataFrame as RayDataFrameInternal,
- exec_sql_on_tables,
+ DFRayContext as DFRayContextInternal,
+ DFRayDataFrame as DFRayDataFrameInternal,
prettify,
)
@@ -52,7 +50,7 @@ setup_logging()
_log_level = os.environ.get("DATAFUSION_RAY_LOG_LEVEL", "ERROR").upper()
_rust_backtrace = os.environ.get("RUST_BACKTRACE", "0")
-runtime_env = {
+df_ray_runtime_env = {
"worker_process_setup_hook": setup_logging,
"env_vars": {
"DATAFUSION_RAY_LOG_LEVEL": _log_level,
@@ -88,7 +86,9 @@ async def wait_for(coros, name=""):
# wrap the coro in a task to work with python 3.10 and 3.11+ where
asyncio.wait semantics
# changed to not accept any awaitable
start = time.time()
- done, _ = await asyncio.wait([asyncio.create_task(_ensure_coro(c)) for c
in coros])
+ done, _ = await asyncio.wait(
+ [asyncio.create_task(_ensure_coro(c)) for c in coros]
+ )
end = time.time()
log.info(f"waiting for {name} took {end - start}s")
for d in done:
@@ -101,61 +101,63 @@ async def wait_for(coros, name=""):
return return_values
-class RayStagePool:
- """A pool of RayStage actors that can be acquired and released"""
+class DFRayProcessorPool:
+ """A pool of DFRayProcessor actors that can be acquired and released"""
# TODO: We can probably manage this set in a better way
- # This is not a threadsafe implementation.
+ # This is not a threadsafe implementation, though the
DFRayContextSupervisor accesses it
+ # from a single thread
+ #
# This is simple though and will suffice for now
def __init__(self, min_workers: int, max_workers: int):
self.min_workers = min_workers
self.max_workers = max_workers
- # a map of stage_key (a random identifier) to stage actor reference
+ # a map of processor_key (a random identifier) to stage actor reference
self.pool = {}
- # a map of stage_key to listening address
+ # a map of processor_key to listening address
self.addrs = {}
- # holds object references from the start_up method for each stage
- # we know all stages are listening when all of these refs have
+ # holds object references from the start_up method for each processor
+ # we know all processors are listening when all of these refs have
# been waited on. When they are ready we remove them from this set
- self.stages_started = set()
+ self.processors_started = set()
- # an event that is set when all stages are ready to serve
- self.stages_ready = asyncio.Event()
+ # an event that is set when all processors are ready to serve
+ self.processors_ready = asyncio.Event()
- # stages that are started but we need to get their address
+ # processors that are started but we need to get their address
self.need_address = set()
- # stages that we have the address for but need to start serving
+ # processors that we have the address for but need to start serving
self.need_serving = set()
- # stages in use
+ # processors in use
self.acquired = set()
- # stages available
+ # processors available
self.available = set()
for _ in range(min_workers):
- self._new_stage()
+ self._new_processor()
log.info(
- f"created ray stage pool (min_workers: {min_workers}, max_workers:
{max_workers})"
+ f"created ray processor pool (min_workers: {min_workers},
max_workers: {max_workers})"
)
async def start(self):
- if not self.stages_ready.is_set():
- await self._wait_for_stages_started()
+ if not self.processors_ready.is_set():
+ await self._wait_for_processors_started()
await self._wait_for_get_addrs()
await self._wait_for_serve()
- self.stages_ready.set()
+ self.processors_ready.set()
async def wait_for_ready(self):
- await self.stages_ready.wait()
+ await self.processors_ready.wait()
async def acquire(self, need=1):
- stage_keys = []
+ processor_keys = []
have = len(self.available)
total = len(self.available) + len(self.acquired)
@@ -164,61 +166,67 @@ class RayStagePool:
need_to_make = need - have
if need_to_make > can_make:
- raise Exception(f"Cannot allocate workers above
{self.max_workers}")
+ raise Exception(
+ f"Cannot allocate workers above {self.max_workers}"
+ )
if need_to_make > 0:
- log.debug(f"creating {need_to_make} additional stages")
+ log.debug(f"creating {need_to_make} additional processors")
for _ in range(need_to_make):
- self._new_stage()
- await wait_for([self.start()], "waiting for created stages")
+ self._new_processor()
+ await wait_for([self.start()], "waiting for created processors")
assert len(self.available) >= need
for _ in range(need):
- stage_key = self.available.pop()
- self.acquired.add(stage_key)
-
- stage_keys.append(stage_key)
-
- stages = [self.pool[sk] for sk in stage_keys]
- addrs = [self.addrs[sk] for sk in stage_keys]
- return (stages, stage_keys, addrs)
-
- def release(self, stage_keys: list[str]):
- for stage_key in stage_keys:
- self.acquired.remove(stage_key)
- self.available.add(stage_key)
-
- def _new_stage(self):
- self.stages_ready.clear()
- stage_key = new_friendly_name()
- log.debug(f"starting stage: {stage_key}")
- stage = RayStage.options(name=f"Stage: {stage_key}").remote(stage_key)
- self.pool[stage_key] = stage
- self.stages_started.add(stage.start_up.remote())
- self.available.add(stage_key)
-
- async def _wait_for_stages_started(self):
- log.info("waiting for stages to be ready")
- started_keys = await wait_for(self.stages_started, "stages to be
started")
- # we need the addresses of these stages still
+ processor_key = self.available.pop()
+ self.acquired.add(processor_key)
+
+ processor_keys.append(processor_key)
+
+ processors = [self.pool[sk] for sk in processor_keys]
+ addrs = [self.addrs[sk] for sk in processor_keys]
+ return (processors, processor_keys, addrs)
+
+ def release(self, processor_keys: list[str]):
+ for processor_key in processor_keys:
+ self.acquired.remove(processor_key)
+ self.available.add(processor_key)
+
+ def _new_processor(self):
+ self.processors_ready.clear()
+ processor_key = new_friendly_name()
+ log.debug(f"starting processor: {processor_key}")
+ processor = DFRayProcessor.options(
+ name=f"Processor : {processor_key}"
+ ).remote(processor_key)
+ self.pool[processor_key] = processor
+ self.processors_started.add(processor.start_up.remote())
+ self.available.add(processor_key)
+
+ async def _wait_for_processors_started(self):
+ log.info("waiting for processors to be ready")
+ started_keys = await wait_for(
+ self.processors_started, "processors to be started"
+ )
+ # we need the addresses of these processors still
self.need_address.update(set(started_keys))
- # we've started all the stages we know about
- self.stages_started = set()
- log.info("stages are all listening")
+ # we've started all the processors we know about
+ self.processors_started = set()
+ log.info("processors are all listening")
async def _wait_for_get_addrs(self):
# get the addresses in a pipelined fashion
refs = []
- stage_keys = []
- for stage_key in self.need_address:
- stage = self.pool[stage_key]
- refs.append(stage.addr.remote())
- stage_keys.append(stage_key)
+ processor_keys = []
+ for processor_key in self.need_address:
+ processor = self.pool[processor_key]
+ refs.append(processor.addr.remote())
+ processor_keys.append(processor_key)
- self.need_serving.add(stage_key)
+ self.need_serving.add(processor_key)
- addrs = await wait_for(refs, "stage addresses")
+ addrs = await wait_for(refs, "processor addresses")
for key, addr in addrs:
self.addrs[key] = addr
@@ -226,45 +234,49 @@ class RayStagePool:
self.need_address = set()
async def _wait_for_serve(self):
- log.info("running stages")
+ log.info("running processors")
try:
- for stage_key in self.need_serving:
- log.info(f"starting serving of stage {stage_key}")
- stage = self.pool[stage_key]
- stage.serve.remote()
+ for processor_key in self.need_serving:
+ log.info(f"starting serving of processor {processor_key}")
+ processor = self.pool[processor_key]
+ processor.serve.remote()
self.need_serving = set()
except Exception as e:
- log.error(f"StagePool: Uhandled Exception in serve: {e}")
+ log.error(f"ProcessorPool: Uhandled Exception in serve: {e}")
raise e
async def all_done(self):
- log.info("calling stage all done")
- refs = [stage.all_done.remote() for stage in self.pool.values()]
- await wait_for(refs, "stages to be all done")
- log.info("all stages shutdown")
+ log.info("calling processor all done")
+ refs = [
+ processor.all_done.remote() for processor in self.pool.values()
+ ]
+ await wait_for(refs, "processors to be all done")
+ log.info("all processors shutdown")
@ray.remote(num_cpus=0)
-class RayStage:
- def __init__(self, stage_key):
- self.stage_key = stage_key
+class DFRayProcessor:
+ def __init__(self, processor_key):
+ self.processor_key = processor_key
# import this here so ray doesn't try to serialize the rust extension
- from datafusion_ray._datafusion_ray_internal import StageService
+ from datafusion_ray._datafusion_ray_internal import (
+ DFRayProcessorService,
+ )
- self.stage_service = StageService(stage_key)
+ self.processor_service = DFRayProcessorService(processor_key)
async def start_up(self):
# this method is sync
- self.stage_service.start_up()
- return self.stage_key
+ self.processor_service.start_up()
+ return self.processor_key
async def all_done(self):
- await self.stage_service.all_done()
+ await self.processor_service.all_done()
async def addr(self):
- return (self.stage_key, self.stage_service.addr())
+ return (self.processor_key, self.processor_service.addr())
async def update_plan(
self,
@@ -273,7 +285,7 @@ class RayStage:
partition_group: list[int],
plan_bytes: bytes,
):
- await self.stage_service.update_plan(
+ await self.processor_service.update_plan(
stage_id,
stage_addrs,
partition_group,
@@ -281,9 +293,11 @@ class RayStage:
)
async def serve(self):
- log.info(f"[{self.stage_key}] serving on {self.stage_service.addr()}")
- await self.stage_service.serve()
- log.info(f"[{self.stage_key}] done serving")
+ log.info(
+ f"[{self.processor_key}] serving on
{self.processor_service.addr()}"
+ )
+ await self.processor_service.serve()
+ log.info(f"[{self.processor_key}] done serving")
@dataclass
@@ -304,7 +318,7 @@ class InternalStageData:
child_stage_ids: list[int]
num_output_partitions: int
full_partitions: bool
- remote_stage: ... # ray.actor.ActorHandle[RayStage]
+ remote_processor: ... # ray.actor.ActorHandle[DFRayProcessor]
remote_addr: str
def __str__(self):
@@ -312,16 +326,18 @@ class InternalStageData:
@ray.remote(num_cpus=0)
-class RayContextSupervisor:
+class DFRayContextSupervisor:
def __init__(
self,
worker_pool_min: int,
worker_pool_max: int,
) -> None:
- log.info(f"Creating RayContextSupervisor worker_pool_min:
{worker_pool_min}")
- self.pool = RayStagePool(worker_pool_min, worker_pool_max)
+ log.info(
+ f"Creating DFRayContextSupervisor worker_pool_min:
{worker_pool_min}"
+ )
+ self.pool = DFRayProcessorPool(worker_pool_min, worker_pool_max)
self.stages: dict[str, InternalStageData] = {}
- log.info("Created RayContextSupervisor")
+ log.info("Created DFRayContextSupervisor")
async def start(self):
await self.pool.start()
@@ -331,7 +347,9 @@ class RayContextSupervisor:
async def get_stage_addrs(self, stage_id: int):
addrs = [
- sd.remote_addr for sd in self.stages.values() if sd.stage_id ==
stage_id
+ sd.remote_addr
+ for sd in self.stages.values()
+ if sd.stage_id == stage_id
]
return addrs
@@ -342,23 +360,23 @@ class RayContextSupervisor:
if len(self.stages) > 0:
self.pool.release(list(self.stages.keys()))
- remote_stages, remote_stage_keys, remote_addrs = await
self.pool.acquire(
- len(stage_datas)
+ remote_processors, remote_processor_keys, remote_addrs = (
+ await self.pool.acquire(len(stage_datas))
)
self.stages = {}
for i, sd in enumerate(stage_datas):
- remote_stage = remote_stages[i]
- remote_stage_key = remote_stage_keys[i]
+ remote_processor = remote_processors[i]
+ remote_processor_key = remote_processor_keys[i]
remote_addr = remote_addrs[i]
- self.stages[remote_stage_key] = InternalStageData(
+ self.stages[remote_processor_key] = InternalStageData(
sd.stage_id,
sd.plan_bytes,
sd.partition_group,
sd.child_stage_ids,
sd.num_output_partitions,
sd.full_partitions,
- remote_stage,
+ remote_processor,
remote_addr,
)
@@ -379,9 +397,12 @@ class RayContextSupervisor:
log.info(f"going to update plan for {stage_key}")
kid = addrs_by_stage_key[stage_key]
refs.append(
- isd.remote_stage.update_plan.remote(
+ isd.remote_processor.update_plan.remote(
isd.stage_id,
- {stage_id: val["child_addrs"] for (stage_id, val) in
kid.items()},
+ {
+ stage_id: val["child_addrs"]
+ for (stage_id, val) in kid.items()
+ },
isd.partition_group,
isd.plan_bytes,
)
@@ -413,7 +434,9 @@ class RayContextSupervisor:
]
# sanity check
- assert all([op == output_partitions[0] for op in
output_partitions])
+ assert all(
+ [op == output_partitions[0] for op in output_partitions]
+ )
output_partitions = output_partitions[0]
for child_stage_isd in child_stage_datas:
@@ -440,16 +463,16 @@ class RayContextSupervisor:
await self.pool.all_done()
-class RayDataFrame:
+class DFRayDataFrame:
def __init__(
self,
- ray_internal_df: RayDataFrameInternal,
- supervisor, # ray.actor.ActorHandle[RayContextSupervisor],
+ internal_df: DFRayDataFrameInternal,
+ supervisor, # ray.actor.ActorHandle[DFRayContextSupervisor],
batch_size=8192,
partitions_per_worker: int | None = None,
prefetch_buffer_size=0,
):
- self.df = ray_internal_df
+ self.df = internal_df
self.supervisor = supervisor
self._stages = None
self._batches = None
@@ -461,7 +484,9 @@ class RayDataFrame:
# create our coordinator now, which we need to create stages
if not self._stages:
self._stages = self.df.stages(
- self.batch_size, self.prefetch_buffer_size,
self.partitions_per_worker
+ self.batch_size,
+ self.prefetch_buffer_size,
+ self.partitions_per_worker,
)
return self._stages
@@ -495,7 +520,9 @@ class RayDataFrame:
)
log.debug(f"last stage addrs {last_stage_addrs}")
- reader = self.df.read_final_stage(last_stage_id,
last_stage_addrs[0])
+ reader = self.df.read_final_stage(
+ last_stage_id, last_stage_addrs[0]
+ )
log.debug("got reader")
self._batches = list(reader)
return self._batches
@@ -528,7 +555,7 @@ class RayDataFrame:
call_sync(wait_for([ref], "creating ray stages"))
-class RayContext:
+class DFRayContext:
def __init__(
self,
batch_size: int = 8192,
@@ -537,12 +564,12 @@ class RayContext:
worker_pool_min: int = 1,
worker_pool_max: int = 100,
) -> None:
- self.ctx = RayContextInternal()
+ self.ctx = DFRayContextInternal()
self.batch_size = batch_size
self.partitions_per_worker = partitions_per_worker
self.prefetch_buffer_size = prefetch_buffer_size
- self.supervisor = RayContextSupervisor.options(
+ self.supervisor = DFRayContextSupervisor.options(
name="RayContextSupersisor",
).remote(
worker_pool_min,
@@ -564,14 +591,16 @@ class RayContext:
def register_parquet(self, name: str, path: str):
self.ctx.register_parquet(name, path)
- def register_listing_table(self, name: str, path: str,
file_extention="parquet"):
+ def register_listing_table(
+ self, name: str, path: str, file_extention="parquet"
+ ):
self.ctx.register_listing_table(name, path, file_extention)
- def sql(self, query: str) -> RayDataFrame:
+ def sql(self, query: str) -> DFRayDataFrame:
df = self.ctx.sql(query)
- return RayDataFrame(
+ return DFRayDataFrame(
df,
self.supervisor,
self.batch_size,
@@ -583,8 +612,6 @@ class RayContext:
self.ctx.set(option, value)
def __del__(self):
- log.info("RayContext, cleaning up remote resources")
+ log.info("DFRayContext, cleaning up remote resources")
ref = self.supervisor.all_done.remote()
- call_sync(wait_for([ref], "RayContextSupervisor all done"))
-
- # log.debug("all stage addrs set? or should be")
+ call_sync(wait_for([ref], "DFRayContextSupervisor all done"))
diff --git a/datafusion_ray/util.py b/datafusion_ray/util.py
new file mode 100644
index 0000000..d3fecf8
--- /dev/null
+++ b/datafusion_ray/util.py
@@ -0,0 +1,4 @@
+from datafusion_ray._datafusion_ray_internal import (
+ exec_sql_on_tables,
+ prettify,
+)
diff --git a/examples/ray_stage.py b/examples/ray_stage.py
deleted file mode 100644
index 72e983e..0000000
--- a/examples/ray_stage.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import argparse
-import time
-import os
-import ray
-from datafusion_ray import RayContext
-
-
-def go(data_dir: str, concurrency: int, isolate: bool):
- print(f"isolate {isolate}")
- ctx = RayContext(
- isolate_partitions=isolate, bucket="rob-tandy-tmp", batch_size=1024
- )
- ctx.set("datafusion.execution.target_partitions", str(concurrency))
- ctx.set("datafusion.catalog.information_schema", "true")
- ctx.set("datafusion.optimizer.enable_round_robin_repartition", "false")
- ctx.set("datafusion.execution.coalesce_batches", "false")
-
- for table in [
- "customer",
- "orders",
- ]:
- f = os.path.join(data_dir, f"{table}.parquet")
- print("Registering table", table, "using path", f)
- ctx.register_parquet(table, f)
-
- query = """SELECT customer.c_name, sum(orders.o_totalprice) as total_amount
- FROM customer JOIN orders ON customer.c_custkey = orders.o_custkey
- GROUP BY customer.c_name order by total_amount desc limit 10"""
-
- # query = """SELECT count(customer.c_name), customer.c_mktsegment from
customer group by customer.c_mktsegment limit 10"""
-
- df = ctx.sql(query)
- for stage in df.stages():
- print("Stage ", stage.stage_id)
- print(stage.execution_plan().display_indent())
-
- df.show()
-
- time.sleep(3)
-
-
-if __name__ == "__main__":
- ray.init(namespace="example")
- parser = argparse.ArgumentParser()
- parser.add_argument("--data", required=True, help="path to tpch*.parquet
files")
- parser.add_argument("--concurrency", required=True, type=int)
- parser.add_argument(
- "--isolate",
- action="store_true",
- help="do each partition as a separate ray actor, more concurrency",
- )
- args = parser.parse_args()
-
- go(args.data, args.concurrency, args.isolate)
diff --git a/examples/tips.py b/examples/tips.py
index 2df233b..7d72ba5 100644
--- a/examples/tips.py
+++ b/examples/tips.py
@@ -19,11 +19,11 @@ import argparse
import datafusion
import ray
-from datafusion_ray import RayContext
+from datafusion_ray import DFRayContext
def go(data_dir: str):
- ctx = RayContext()
+ ctx = DFRayContext()
# we could set this value to however many CPUs we plan to give each
# ray task
ctx.set("datafusion.execution.target_partitions", "1")
diff --git a/pyproject.toml b/pyproject.toml
index b81483b..32aa6e3 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -21,14 +21,14 @@ build-backend = "maturin"
[project]
name = "datafusion-ray"
-requires-python = ">=3.7"
+requires-python = ">=3.10"
classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
- "datafusion>=43.0.0",
+ "datafusion>=45.0.0",
"pyarrow>=18.0.0",
"typing-extensions;python_version<'3.13'",
]
diff --git a/requirements-in.txt b/requirements-in.txt
deleted file mode 100644
index 4812e1b..0000000
--- a/requirements-in.txt
+++ /dev/null
@@ -1,12 +0,0 @@
-black
-flake8
-isort
-maturin
-mypy
-numpy
-pyarrow>=18.0.0
-pytest
-ray==2.40.0
-datafusion==43.1.0
-toml
-importlib_metadata; python_version < "3.8"
diff --git a/src/codec.rs b/src/codec.rs
index ade208d..b817e13 100644
--- a/src/codec.rs
+++ b/src/codec.rs
@@ -5,7 +5,7 @@ use crate::{
max_rows::MaxRowsExec,
pre_fetch::PrefetchExec,
protobuf::{
- MaxRowsExecNode, PartitionIsolatorExecNode, PrefetchExecNode,
RayStageReaderExecNode,
+ DfRayStageReaderExecNode, MaxRowsExecNode, PartitionIsolatorExecNode,
PrefetchExecNode,
},
};
@@ -24,7 +24,7 @@ use datafusion_proto::protobuf;
use prost::Message;
-use crate::ray_stage_reader::RayStageReaderExec;
+use crate::stage_reader::DFRayStageReaderExec;
#[derive(Debug)]
/// Physical Extension Codec for for DataFusion Ray plans
@@ -49,7 +49,7 @@ impl PhysicalExtensionCodec for RayCodec {
node.partition_count as usize,
)))
}
- } else if let Ok(node) = RayStageReaderExecNode::decode(buf) {
+ } else if let Ok(node) = DfRayStageReaderExecNode::decode(buf) {
let schema: Schema = node
.schema
.as_ref()
@@ -64,7 +64,7 @@ impl PhysicalExtensionCodec for RayCodec {
)?
.ok_or(internal_datafusion_err!("missing partitioning in proto"))?;
- Ok(Arc::new(RayStageReaderExec::try_new(
+ Ok(Arc::new(DFRayStageReaderExec::try_new(
part,
Arc::new(schema),
node.stage_id as usize,
@@ -99,14 +99,14 @@ impl PhysicalExtensionCodec for RayCodec {
}
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) ->
Result<()> {
- if let Some(reader) =
node.as_any().downcast_ref::<RayStageReaderExec>() {
+ if let Some(reader) =
node.as_any().downcast_ref::<DFRayStageReaderExec>() {
let schema: protobuf::Schema = reader.schema().try_into()?;
let partitioning: protobuf::Partitioning = serialize_partitioning(
reader.properties().output_partitioning(),
&DefaultPhysicalExtensionCodec {},
)?;
- let pb = RayStageReaderExecNode {
+ let pb = DfRayStageReaderExecNode {
schema: Some(schema),
partitioning: Some(partitioning),
stage_id: reader.stage_id as u64,
@@ -151,7 +151,7 @@ impl PhysicalExtensionCodec for RayCodec {
#[cfg(test)]
mod test {
use super::*;
- use crate::ray_stage_reader::RayStageReaderExec;
+ use crate::stage_reader::DFRayStageReaderExec;
use arrow::datatypes::DataType;
use datafusion::{
physical_plan::{display::DisplayableExecutionPlan, displayable,
Partitioning},
@@ -169,7 +169,7 @@ mod test {
]));
let ctx = SessionContext::new();
let part = Partitioning::UnknownPartitioning(2);
- let exec = Arc::new(RayStageReaderExec::try_new(part, schema,
1).unwrap());
+ let exec = Arc::new(DFRayStageReaderExec::try_new(part, schema,
1).unwrap());
let codec = RayCodec {};
let mut buf = vec![];
codec.try_encode(exec.clone(), &mut buf).unwrap();
@@ -185,7 +185,7 @@ mod test {
let ctx = SessionContext::new();
let part = Partitioning::UnknownPartitioning(2);
let exec = Arc::new(MaxRowsExec::new(
- Arc::new(RayStageReaderExec::try_new(part, schema, 1).unwrap()),
+ Arc::new(DFRayStageReaderExec::try_new(part, schema, 1).unwrap()),
10,
));
let codec = RayCodec {};
diff --git a/src/context.rs b/src/context.rs
index d99962c..191d632 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -23,20 +23,20 @@ use object_store::aws::AmazonS3Builder;
use pyo3::prelude::*;
use std::sync::Arc;
-use crate::dataframe::RayDataFrame;
+use crate::dataframe::DFRayDataFrame;
use crate::physical::RayStageOptimizerRule;
use crate::util::ResultExt;
use url::Url;
-/// Internal Session Context object for the python class RayContext
+/// Internal Session Context object for the python class DFRayContext
#[pyclass]
-pub struct RayContext {
+pub struct DFRayContext {
/// our datafusion context
ctx: SessionContext,
}
#[pymethods]
-impl RayContext {
+impl DFRayContext {
#[new]
pub fn new() -> PyResult<Self> {
let rule = RayStageOptimizerRule::new();
@@ -93,10 +93,10 @@ impl RayContext {
.to_py_err()
}
- pub fn sql(&self, py: Python, query: String) -> PyResult<RayDataFrame> {
+ pub fn sql(&self, py: Python, query: String) -> PyResult<DFRayDataFrame> {
let df = wait_for_future(py, self.ctx.sql(&query))?;
- Ok(RayDataFrame::new(df))
+ Ok(DFRayDataFrame::new(df))
}
pub fn set(&self, option: String, value: String) -> PyResult<()> {
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 56e76fb..adecf6b 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -48,14 +48,14 @@ use tokio::sync::Mutex;
use crate::isolator::PartitionIsolatorExec;
use crate::max_rows::MaxRowsExec;
use crate::pre_fetch::PrefetchExec;
-use crate::ray_stage::RayStageExec;
-use crate::ray_stage_reader::RayStageReaderExec;
+use crate::stage::DFRayStageExec;
+use crate::stage_reader::DFRayStageReaderExec;
use crate::util::collect_from_stage;
use crate::util::display_plan_with_partition_counts;
use crate::util::physical_plan_to_bytes;
use crate::util::ResultExt;
-/// Internal rust class beyind the RayDataFrame python object
+/// Internal rust class beyind the DFRayDataFrame python object
///
/// It is a container for a plan for a query, as we would expect.
///
@@ -67,7 +67,7 @@ use crate::util::ResultExt;
/// The second role of this object is to be able to fetch record batches from
the final_
/// stage in the plan and return them to python.
#[pyclass]
-pub struct RayDataFrame {
+pub struct DFRayDataFrame {
/// holds the logical plan of the query we will execute
df: DataFrame,
/// the physical plan we will use to consume the final stage.
@@ -75,7 +75,7 @@ pub struct RayDataFrame {
final_plan: Option<Arc<dyn ExecutionPlan>>,
}
-impl RayDataFrame {
+impl DFRayDataFrame {
pub fn new(df: DataFrame) -> Self {
Self {
df,
@@ -85,7 +85,7 @@ impl RayDataFrame {
}
#[pymethods]
-impl RayDataFrame {
+impl DFRayDataFrame {
#[pyo3(signature = (batch_size, prefetch_buffer_size,
partitions_per_worker=None))]
fn stages(
&mut self,
@@ -93,7 +93,7 @@ impl RayDataFrame {
batch_size: usize,
prefetch_buffer_size: usize,
partitions_per_worker: Option<usize>,
- ) -> PyResult<Vec<PyDataFrameStage>> {
+ ) -> PyResult<Vec<PyDFRayStage>> {
let mut stages = vec![];
// TODO: This can be done more efficiently, likely in one pass but I'm
@@ -109,7 +109,7 @@ impl RayDataFrame {
display_plan_with_partition_counts(&plan)
);
- if let Some(stage_exec) =
plan.as_any().downcast_ref::<RayStageExec>() {
+ if let Some(stage_exec) =
plan.as_any().downcast_ref::<DFRayStageExec>() {
let input = plan.children();
assert!(input.len() == 1, "RayStageExec must have exactly one
child");
let input = input[0];
@@ -120,7 +120,7 @@ impl RayDataFrame {
plan.output_partitioning().partition_count()
);
- let replacement = Arc::new(RayStageReaderExec::try_new(
+ let replacement = Arc::new(DFRayStageReaderExec::try_new(
plan.output_partitioning().clone(),
input.schema(),
stage_exec.stage_id,
@@ -145,7 +145,7 @@ impl RayDataFrame {
displayable(plan.as_ref()).one_line()
);
- if let Some(stage_exec) =
plan.as_any().downcast_ref::<RayStageExec>() {
+ if let Some(stage_exec) =
plan.as_any().downcast_ref::<DFRayStageExec>() {
trace!("ray stage exec");
let input = plan.children();
assert!(input.len() == 1, "RayStageExec must have exactly one
child");
@@ -153,7 +153,7 @@ impl RayDataFrame {
let fixed_plan = input.clone().transform_down(down)?.data;
- let stage = PyDataFrameStage::new(
+ let stage = PyDFRayStage::new(
stage_exec.stage_id,
fixed_plan,
partition_groups.clone(),
@@ -231,7 +231,7 @@ impl RayDataFrame {
return internal_err!("Last stage expected to have one
partition").to_py_err();
}
- last_stage = PyDataFrameStage::new(
+ last_stage = PyDFRayStage::new(
last_stage.stage_id,
Arc::new(MaxRowsExec::new(
Arc::new(CoalesceBatchesExec::new(last_stage.plan, batch_size))
@@ -244,7 +244,7 @@ impl RayDataFrame {
// done fixing last stage
- let reader_plan = Arc::new(RayStageReaderExec::try_new_from_input(
+ let reader_plan = Arc::new(DFRayStageReaderExec::try_new_from_input(
last_stage.plan.clone(),
last_stage.stage_id,
)?) as Arc<dyn ExecutionPlan>;
@@ -298,6 +298,7 @@ impl RayDataFrame {
}
}
+#[allow(clippy::type_complexity)]
fn build_replacement(
plan: Arc<dyn ExecutionPlan>,
prefetch_buffer_size: usize,
@@ -351,7 +352,7 @@ fn build_replacement(
/// A Python class to hold a PHysical plan of a single stage
#[pyclass]
-pub struct PyDataFrameStage {
+pub struct PyDFRayStage {
/// our stage id
stage_id: usize,
/// the physical plan of our stage
@@ -364,7 +365,7 @@ pub struct PyDataFrameStage {
/// CombinedRecordBatchStream
full_partitions: bool,
}
-impl PyDataFrameStage {
+impl PyDFRayStage {
fn new(
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
@@ -381,7 +382,7 @@ impl PyDataFrameStage {
}
#[pymethods]
-impl PyDataFrameStage {
+impl PyDFRayStage {
#[getter]
fn stage_id(&self) -> usize {
self.stage_id
@@ -410,7 +411,7 @@ impl PyDataFrameStage {
self.plan
.clone()
.transform_down(|node: Arc<dyn ExecutionPlan>| {
- if let Some(reader) =
node.as_any().downcast_ref::<RayStageReaderExec>() {
+ if let Some(reader) =
node.as_any().downcast_ref::<DFRayStageReaderExec>() {
result.push(reader.stage_id);
}
Ok(Transformed::no(node))
diff --git a/src/lib.rs b/src/lib.rs
index 0a7e04f..5158f5c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -31,18 +31,18 @@ pub mod isolator;
pub mod max_rows;
pub mod physical;
pub mod pre_fetch;
-pub mod ray_stage;
-pub mod ray_stage_reader;
-pub mod stage_service;
+pub mod processor_service;
+pub mod stage;
+pub mod stage_reader;
pub mod util;
#[pymodule]
fn _datafusion_ray_internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
setup_logging();
- m.add_class::<context::RayContext>()?;
- m.add_class::<dataframe::RayDataFrame>()?;
- m.add_class::<dataframe::PyDataFrameStage>()?;
- m.add_class::<stage_service::StageService>()?;
+ m.add_class::<context::DFRayContext>()?;
+ m.add_class::<dataframe::DFRayDataFrame>()?;
+ m.add_class::<dataframe::PyDFRayStage>()?;
+ m.add_class::<processor_service::DFRayProcessorService>()?;
m.add_function(wrap_pyfunction!(util::prettify, m)?)?;
m.add_function(wrap_pyfunction!(util::exec_sql_on_tables, m)?)?;
Ok(())
diff --git a/src/physical.rs b/src/physical.rs
index 428ae94..9e05ad1 100644
--- a/src/physical.rs
+++ b/src/physical.rs
@@ -25,7 +25,7 @@ use datafusion::physical_plan::ExecutionPlan;
use log::debug;
use std::sync::Arc;
-use crate::ray_stage::RayStageExec;
+use crate::stage::DFRayStageExec;
use crate::util::display_plan_with_partition_counts;
/// This optimizer rule walks up the physical plan tree
@@ -70,7 +70,7 @@ impl PhysicalOptimizerRule for RayStageOptimizerRule {
|| plan.as_any().downcast_ref::<SortExec>().is_some()
|| plan.as_any().downcast_ref::<NestedLoopJoinExec>().is_some()
{
- let stage = Arc::new(RayStageExec::new(plan, stage_counter));
+ let stage = Arc::new(DFRayStageExec::new(plan, stage_counter));
stage_counter += 1;
Ok(Transformed::yes(stage as Arc<dyn ExecutionPlan>))
} else {
@@ -79,7 +79,8 @@ impl PhysicalOptimizerRule for RayStageOptimizerRule {
};
let plan = plan.transform_up(up)?.data;
- let final_plan = Arc::new(RayStageExec::new(plan, stage_counter)) as
Arc<dyn ExecutionPlan>;
+ let final_plan =
+ Arc::new(DFRayStageExec::new(plan, stage_counter)) as Arc<dyn
ExecutionPlan>;
debug!(
"optimized physical plan:\n{}",
diff --git a/src/stage_service.rs b/src/processor_service.rs
similarity index 93%
rename from src/stage_service.rs
rename to src/processor_service.rs
index 949bae2..5164577 100644
--- a/src/stage_service.rs
+++ b/src/processor_service.rs
@@ -50,8 +50,8 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::flight::{FlightHandler, FlightServ};
use crate::isolator::PartitionGroup;
use crate::util::{
- bytes_to_physical_plan, display_plan_with_partition_counts, extract_ticket,
- input_stage_ids, make_client, ResultExt,
+ bytes_to_physical_plan, display_plan_with_partition_counts,
extract_ticket, input_stage_ids,
+ make_client, ResultExt,
};
/// a map of stage_id, partition to a list FlightClients that can serve
@@ -59,23 +59,23 @@ use crate::util::{
/// will consume the partition from all clients and merge the results.
pub(crate) struct ServiceClients(pub HashMap<(usize, usize),
Mutex<Vec<FlightClient>>>);
-/// StageHandler is a [`FlightHandler`] that serves streams of partitions from
a hosted Physical Plan
+/// DFRayProcessorHandler is a [`FlightHandler`] that serves streams of
partitions from a hosted Physical Plan
/// It only responds to the DoGet Arrow Flight method.
-struct StageHandler {
+struct DFRayProcessorHandler {
/// our name, useful for logging
name: String,
/// Inner state of the handler
- inner: RwLock<Option<StageHandlerInner>>,
+ inner: RwLock<Option<DFRayProcessorHandlerInner>>,
}
-struct StageHandlerInner {
+struct DFRayProcessorHandlerInner {
/// the physical plan that comprises our stage
pub(crate) plan: Arc<dyn ExecutionPlan>,
/// the session context we will use to execute the plan
pub(crate) ctx: SessionContext,
}
-impl StageHandler {
+impl DFRayProcessorHandler {
pub fn new(name: String) -> Self {
let inner = RwLock::new(None);
@@ -88,13 +88,14 @@ impl StageHandler {
plan: Arc<dyn ExecutionPlan>,
partition_group: Vec<usize>,
) -> DFResult<()> {
- let inner = StageHandlerInner::new(stage_id, stage_addrs, plan,
partition_group).await?;
+ let inner =
+ DFRayProcessorHandlerInner::new(stage_id, stage_addrs, plan,
partition_group).await?;
self.inner.write().replace(inner);
Ok(())
}
}
-impl StageHandlerInner {
+impl DFRayProcessorHandlerInner {
pub async fn new(
stage_id: usize,
stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
@@ -169,7 +170,7 @@ impl StageHandlerInner {
}
fn make_stream(
- inner: &StageHandlerInner,
+ inner: &DFRayProcessorHandlerInner,
partition: usize,
) -> Result<impl Stream<Item = Result<RecordBatch, FlightError>> + Send +
'static, Status> {
let task_ctx = inner.ctx.task_ctx();
@@ -190,7 +191,7 @@ fn make_stream(
}
#[async_trait]
-impl FlightHandler for StageHandler {
+impl FlightHandler for DFRayProcessorHandler {
async fn get_stream(
&self,
request: Request<Ticket>,
@@ -234,22 +235,22 @@ impl FlightHandler for StageHandler {
}
}
-/// StageService is a Arrow Flight service that serves streams of
+/// DFRayProcessorService is a Arrow Flight service that serves streams of
/// partitions from a hosted Physical Plan
///
/// It only responds to the DoGet Arrow Flight method
#[pyclass]
-pub struct StageService {
+pub struct DFRayProcessorService {
name: String,
listener: Option<TcpListener>,
- handler: Arc<StageHandler>,
+ handler: Arc<DFRayProcessorHandler>,
addr: Option<String>,
all_done_tx: Arc<Mutex<Sender<()>>>,
all_done_rx: Option<Receiver<()>>,
}
#[pymethods]
-impl StageService {
+impl DFRayProcessorService {
#[new]
pub fn new(name: String) -> PyResult<Self> {
let name = format!("[{}]", name);
@@ -259,7 +260,7 @@ impl StageService {
let (all_done_tx, all_done_rx) = channel(1);
let all_done_tx = Arc::new(Mutex::new(all_done_tx));
- let handler = Arc::new(StageHandler::new(name.clone()));
+ let handler = Arc::new(DFRayProcessorHandler::new(name.clone()));
Ok(Self {
name,
@@ -313,7 +314,7 @@ impl StageService {
}
/// replace the plan that this service was providing, we will do this when
we want
- /// to reuse the StageService for a subsequent query
+ /// to reuse the DFRayProcessorService for a subsequent query
///
/// returns a python coroutine that should be awaited
pub fn update_plan<'a>(
diff --git a/src/proto/datafusion_ray.proto b/src/proto/datafusion_ray.proto
index 75d3ab1..5516b32 100644
--- a/src/proto/datafusion_ray.proto
+++ b/src/proto/datafusion_ray.proto
@@ -9,7 +9,7 @@ option java_outer_classname = "RayDataFusionProto";
import "datafusion_common.proto";
import "datafusion.proto";
-message RayStageReaderExecNode {
+message DFRayStageReaderExecNode {
// schema of the stage we will consume
datafusion_common.Schema schema = 1;
// properties of the stage we will consume
diff --git a/src/proto/generated/protobuf.rs b/src/proto/generated/protobuf.rs
index 212a366..e5ffe9e 100644
--- a/src/proto/generated/protobuf.rs
+++ b/src/proto/generated/protobuf.rs
@@ -1,6 +1,6 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct RayStageReaderExecNode {
+pub struct DfRayStageReaderExecNode {
/// schema of the stage we will consume
#[prost(message, optional, tag = "1")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
diff --git a/src/ray_stage.rs b/src/stage.rs
similarity index 97%
rename from src/ray_stage.rs
rename to src/stage.rs
index f7a0649..e1b139d 100644
--- a/src/ray_stage.rs
+++ b/src/stage.rs
@@ -79,7 +79,7 @@ use datafusion::{arrow::datatypes::SchemaRef,
execution::SendableRecordBatchStre
///
/// See [`crate::isolator::PartitionIsolatorExec`] for more information on how
the shadow partitions work
#[derive(Debug)]
-pub struct RayStageExec {
+pub struct DFRayStageExec {
/// Input plan
pub(crate) input: Arc<dyn ExecutionPlan>,
/// Output partitioning
@@ -87,7 +87,7 @@ pub struct RayStageExec {
pub stage_id: usize,
}
-impl RayStageExec {
+impl DFRayStageExec {
pub fn new(input: Arc<dyn ExecutionPlan>, stage_id: usize) -> Self {
let properties = input.properties().clone();
@@ -110,7 +110,7 @@ impl RayStageExec {
}
}
}
-impl DisplayAs for RayStageExec {
+impl DisplayAs for DFRayStageExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
write!(
f,
@@ -121,7 +121,7 @@ impl DisplayAs for RayStageExec {
}
}
-impl ExecutionPlan for RayStageExec {
+impl ExecutionPlan for DFRayStageExec {
fn schema(&self) -> SchemaRef {
self.input.schema()
}
@@ -152,7 +152,7 @@ impl ExecutionPlan for RayStageExec {
// as the plan tree is rearranged we want to remember the original
partitioning that we
// had, even if we get new inputs. This is because
RayStageReaderExecs, when created by
// the RayDataFrame will need to know the original partitioning
- Ok(Arc::new(RayStageExec::new_with_properties(
+ Ok(Arc::new(DFRayStageExec::new_with_properties(
child,
self.stage_id,
self.properties.clone(),
diff --git a/src/ray_stage_reader.rs b/src/stage_reader.rs
similarity index 96%
rename from src/ray_stage_reader.rs
rename to src/stage_reader.rs
index c64a154..0916be4 100644
--- a/src/ray_stage_reader.rs
+++ b/src/stage_reader.rs
@@ -15,8 +15,8 @@ use futures::StreamExt;
use log::trace;
use prost::Message;
+use crate::processor_service::ServiceClients;
use crate::protobuf::FlightTicketData;
-use crate::stage_service::ServiceClients;
use crate::util::CombinedRecordBatchStream;
/// An [`ExecutionPlan`] that will produce a stream of batches fetched from
another stage
@@ -25,13 +25,13 @@ use crate::util::CombinedRecordBatchStream;
/// Note that discovery of the service is handled by populating an instance of
[`crate::stage_service::ServiceClients`]
/// and storing it as an extension in the
[`datafusion::execution::TaskContext`] configuration.
#[derive(Debug)]
-pub struct RayStageReaderExec {
+pub struct DFRayStageReaderExec {
properties: PlanProperties,
schema: SchemaRef,
pub stage_id: usize,
}
-impl RayStageReaderExec {
+impl DFRayStageReaderExec {
pub fn try_new_from_input(input: Arc<dyn ExecutionPlan>, stage_id: usize)
-> Result<Self> {
let properties = input.properties().clone();
@@ -53,7 +53,7 @@ impl RayStageReaderExec {
})
}
}
-impl DisplayAs for RayStageReaderExec {
+impl DisplayAs for DFRayStageReaderExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
write!(
f,
@@ -64,7 +64,7 @@ impl DisplayAs for RayStageReaderExec {
}
}
-impl ExecutionPlan for RayStageReaderExec {
+impl ExecutionPlan for DFRayStageReaderExec {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
diff --git a/src/util.rs b/src/util.rs
index 941c6e4..0c07c5c 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -38,9 +38,9 @@ use pyo3::types::{PyBytes, PyList};
use tonic::transport::Channel;
use crate::codec::RayCodec;
+use crate::processor_service::ServiceClients;
use crate::protobuf::FlightTicketData;
-use crate::ray_stage_reader::RayStageReaderExec;
-use crate::stage_service::ServiceClients;
+use crate::stage_reader::DFRayStageReaderExec;
use prost::Message;
use tokio::macros::support::thread_rng_n;
@@ -218,7 +218,7 @@ pub fn input_stage_ids(plan: &Arc<dyn ExecutionPlan>) ->
Result<Vec<usize>, Data
let mut result = vec![];
plan.clone()
.transform_down(|node: Arc<dyn ExecutionPlan>| {
- if let Some(reader) =
node.as_any().downcast_ref::<RayStageReaderExec>() {
+ if let Some(reader) =
node.as_any().downcast_ref::<DFRayStageReaderExec>() {
result.push(reader.stage_id);
}
Ok(Transformed::no(node))
@@ -486,7 +486,7 @@ mod test {
// test with file
let tables = vec![(
"people".to_string(),
- format!("file://{}", file.path().to_str().unwrap().to_string()),
+ format!("file://{}", file.path().to_str().unwrap()),
)];
let query = "SELECT * FROM people ORDER BY age".to_string();
let res = exec_sql(query.clone(), tables).await.unwrap();
@@ -501,7 +501,7 @@ mod test {
// test with dir
let tables = vec![(
"people".to_string(),
- format!("file://{}/", dir.path().to_str().unwrap().to_string()),
+ format!("file://{}/", dir.path().to_str().unwrap()),
)];
let res = exec_sql(query, tables).await.unwrap();
assert_eq!(
diff --git a/testdata/tpch/.gitignore b/testdata/tpch/.gitignore
new file mode 100644
index 0000000..e69de29
diff --git a/testdata/queries/q1.sql b/tpch/queries/q1.sql
similarity index 100%
rename from testdata/queries/q1.sql
rename to tpch/queries/q1.sql
diff --git a/testdata/queries/q10.sql b/tpch/queries/q10.sql
similarity index 100%
rename from testdata/queries/q10.sql
rename to tpch/queries/q10.sql
diff --git a/testdata/queries/q11.sql b/tpch/queries/q11.sql
similarity index 100%
rename from testdata/queries/q11.sql
rename to tpch/queries/q11.sql
diff --git a/testdata/queries/q12.sql b/tpch/queries/q12.sql
similarity index 100%
rename from testdata/queries/q12.sql
rename to tpch/queries/q12.sql
diff --git a/testdata/queries/q13.sql b/tpch/queries/q13.sql
similarity index 100%
rename from testdata/queries/q13.sql
rename to tpch/queries/q13.sql
diff --git a/testdata/queries/q14.sql b/tpch/queries/q14.sql
similarity index 100%
rename from testdata/queries/q14.sql
rename to tpch/queries/q14.sql
diff --git a/testdata/queries/q15.sql b/tpch/queries/q15.sql
similarity index 100%
rename from testdata/queries/q15.sql
rename to tpch/queries/q15.sql
diff --git a/testdata/queries/q16.sql b/tpch/queries/q16.sql
similarity index 100%
rename from testdata/queries/q16.sql
rename to tpch/queries/q16.sql
diff --git a/testdata/queries/q17.sql b/tpch/queries/q17.sql
similarity index 100%
rename from testdata/queries/q17.sql
rename to tpch/queries/q17.sql
diff --git a/testdata/queries/q18.sql b/tpch/queries/q18.sql
similarity index 100%
rename from testdata/queries/q18.sql
rename to tpch/queries/q18.sql
diff --git a/testdata/queries/q19.sql b/tpch/queries/q19.sql
similarity index 100%
rename from testdata/queries/q19.sql
rename to tpch/queries/q19.sql
diff --git a/testdata/queries/q2.sql b/tpch/queries/q2.sql
similarity index 100%
rename from testdata/queries/q2.sql
rename to tpch/queries/q2.sql
diff --git a/testdata/queries/q20.sql b/tpch/queries/q20.sql
similarity index 100%
rename from testdata/queries/q20.sql
rename to tpch/queries/q20.sql
diff --git a/testdata/queries/q21.sql b/tpch/queries/q21.sql
similarity index 100%
rename from testdata/queries/q21.sql
rename to tpch/queries/q21.sql
diff --git a/testdata/queries/q22.sql b/tpch/queries/q22.sql
similarity index 100%
rename from testdata/queries/q22.sql
rename to tpch/queries/q22.sql
diff --git a/testdata/queries/q3.sql b/tpch/queries/q3.sql
similarity index 100%
rename from testdata/queries/q3.sql
rename to tpch/queries/q3.sql
diff --git a/testdata/queries/q4.sql b/tpch/queries/q4.sql
similarity index 100%
rename from testdata/queries/q4.sql
rename to tpch/queries/q4.sql
diff --git a/testdata/queries/q5.sql b/tpch/queries/q5.sql
similarity index 100%
rename from testdata/queries/q5.sql
rename to tpch/queries/q5.sql
diff --git a/testdata/queries/q6.sql b/tpch/queries/q6.sql
similarity index 100%
rename from testdata/queries/q6.sql
rename to tpch/queries/q6.sql
diff --git a/testdata/queries/q7.sql b/tpch/queries/q7.sql
similarity index 100%
rename from testdata/queries/q7.sql
rename to tpch/queries/q7.sql
diff --git a/testdata/queries/q8.sql b/tpch/queries/q8.sql
similarity index 100%
rename from testdata/queries/q8.sql
rename to tpch/queries/q8.sql
diff --git a/testdata/queries/q9.sql b/tpch/queries/q9.sql
similarity index 100%
rename from testdata/queries/q9.sql
rename to tpch/queries/q9.sql
diff --git a/tpch/requirements.txt b/tpch/requirements.txt
index 2d257db..e995868 100644
--- a/tpch/requirements.txt
+++ b/tpch/requirements.txt
@@ -1,4 +1,3 @@
-# This is a bad idea, we should lock dependencies with poetry and consume this
tool as an action
-pyarrow
-datafusion
-argparse
+duckdb
+ray[default]
+maturin
diff --git a/tpch/tpc.py b/tpch/tpc.py
deleted file mode 100644
index 8fa5ae8..0000000
--- a/tpch/tpc.py
+++ /dev/null
@@ -1,149 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#
-#
-# This file is useful for running a query against a TPCH dataset.
-#
-# You can run an arbitrary query by passing --query 'select...' or you can run
a
-# TPCH query by passing --qnum 1-22.
-
-import argparse
-import ray
-from datafusion_ray import RayContext, runtime_env
-import os
-import sys
-import time
-
-try:
- import duckdb
-except ImportError:
- print(
- "duckdb not installed, which is used in this file for retrieving the
TPCH query"
- )
- sys.exit(1)
-
-
-def make_ctx(
- data_path: str,
- concurrency: int,
- batch_size: int,
- partitions_per_worker: int | None,
- worker_pool_min: int,
- listing_tables: bool,
-):
-
- # Register the tables
- table_names = [
- "customer",
- "lineitem",
- "nation",
- "orders",
- "part",
- "partsupp",
- "region",
- "supplier",
- ]
- # Connect to a cluster
- # use ray job submit
- ray.init(runtime_env=runtime_env)
-
- ctx = RayContext(
- batch_size=batch_size,
- partitions_per_worker=partitions_per_worker,
- worker_pool_min=worker_pool_min,
- )
-
- ctx.set("datafusion.execution.target_partitions", f"{concurrency}")
- # ctx.set("datafusion.execution.parquet.pushdown_filters", "true")
- ctx.set("datafusion.optimizer.enable_round_robin_repartition", "false")
- ctx.set("datafusion.execution.coalesce_batches", "false")
-
- for table in table_names:
- path = os.path.join(data_path, f"{table}.parquet")
- print(f"Registering table {table} using path {path}")
- if listing_tables:
- ctx.register_listing_table(table, f"{path}/")
- else:
- ctx.register_parquet(table, path)
-
- return ctx
-
-
-def main(
- data_path: str,
- concurrency: int,
- batch_size: int,
- query: str,
- partitions_per_worker: int | None,
- worker_pool_min: int,
- listing_tables,
-) -> None:
- ctx = make_ctx(
- data_path,
- concurrency,
- batch_size,
- partitions_per_worker,
- worker_pool_min,
- listing_tables,
- )
- df = ctx.sql(query)
- time.sleep(3)
- df.show()
-
-
-def tpch_query(qnum: int) -> str:
- query_path = os.path.join(os.path.dirname(__file__), "..", "testdata",
"queries")
- return open(os.path.join(query_path, f"q{qnum}.sql")).read()
-
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--data", type=str, help="data path")
- parser.add_argument("--query", type=str, help="query")
- parser.add_argument(
- "--qnum", type=int, default=0, help="query number for TPCH benchmark"
- )
- parser.add_argument("--concurrency", type=int, help="concurrency")
- parser.add_argument("--batch-size", type=int, help="batch size")
- parser.add_argument(
- "--partitions-per-worker",
- type=int,
- help="Max partitions per Stage Service Worker",
- )
- parser.add_argument(
- "--worker-pool-min",
- type=int,
- help="Minimum number of RayStages to keep in pool",
- )
- parser.add_argument("--listing-tables", action="store_true")
- args = parser.parse_args()
-
- if args.qnum > 0:
- query = tpch_query(int(args.qnum))
- else:
- query = args.query
-
- main(
- args.data,
- int(args.concurrency),
- int(args.batch_size),
- query,
- args.partitions_per_worker,
- args.worker_pool_min,
- args.listing_tables,
- )
diff --git a/tpch/tpcbench.py b/tpch/tpcbench.py
index 85b2937..edb943e 100644
--- a/tpch/tpcbench.py
+++ b/tpch/tpcbench.py
@@ -17,24 +17,16 @@
import argparse
import ray
-from datafusion import SessionContext, SessionConfig
-from datafusion_ray import RayContext, exec_sql_on_tables, prettify,
runtime_env
+from datafusion_ray import DFRayContext, df_ray_runtime_env
+from datafusion_ray.util import exec_sql_on_tables, prettify
from datetime import datetime
import json
import os
import time
-try:
- import duckdb
-except ImportError:
- print(
- "duckdb not installed, which is used in this file for retrieving the
TPCH query"
- )
- sys.exit(1)
-
def tpch_query(qnum: int) -> str:
- query_path = os.path.join(os.path.dirname(__file__), "..", "testdata",
"queries")
+ query_path = os.path.join(os.path.dirname(__file__), "queries")
return open(os.path.join(query_path, f"q{qnum}.sql")).read()
@@ -62,9 +54,9 @@ def main(
]
# Connect to a cluster
# use ray job submit
- ray.init(runtime_env=runtime_env)
+ ray.init(runtime_env=df_ray_runtime_env)
- ctx = RayContext(
+ ctx = DFRayContext(
batch_size=batch_size,
partitions_per_worker=partitions_per_worker,
prefetch_buffer_size=prefetch_buffer_size,
@@ -76,8 +68,6 @@ def main(
ctx.set("datafusion.optimizer.enable_round_robin_repartition", "false")
ctx.set("datafusion.execution.coalesce_batches", "false")
- local_config = SessionConfig()
-
for table in table_names:
path = os.path.join(data_path, f"{table}.parquet")
print(f"Registering table {table} using path {path}")
@@ -106,8 +96,6 @@ def main(
results["local_queries"] = {}
results["validated"] = {}
- duckdb.sql("load tpch")
-
queries = range(1, 23) if qnum == -1 else [qnum]
for qnum in queries:
sql = tpch_query(qnum)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]