This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git


The following commit(s) were added to refs/heads/main by this push:
     new 8498b19  Repo Housekeeping, updating naming remove unused files (#70)
8498b19 is described below

commit 8498b1907c66a7f59e81ddc5d47f1eece9200ad1
Author: robtandy <[email protected]>
AuthorDate: Sun Mar 2 15:43:41 2025 -0500

    Repo Housekeeping, updating naming remove unused files (#70)
---
 .github/workflows/main.yml                     | 105 ++++++++++
 .github/workflows/rust.yml                     |  60 ------
 Cargo.toml                                     |   2 +-
 datafusion_ray/__init__.py                     |   6 +-
 datafusion_ray/core.py                         | 271 ++++++++++++++-----------
 datafusion_ray/util.py                         |   4 +
 examples/ray_stage.py                          |  71 -------
 examples/tips.py                               |   4 +-
 pyproject.toml                                 |   4 +-
 requirements-in.txt                            |  12 --
 src/codec.rs                                   |  18 +-
 src/context.rs                                 |  12 +-
 src/dataframe.rs                               |  35 ++--
 src/lib.rs                                     |  14 +-
 src/physical.rs                                |   7 +-
 src/{stage_service.rs => processor_service.rs} |  35 ++--
 src/proto/datafusion_ray.proto                 |   2 +-
 src/proto/generated/protobuf.rs                |   2 +-
 src/{ray_stage.rs => stage.rs}                 |  10 +-
 src/{ray_stage_reader.rs => stage_reader.rs}   |  10 +-
 src/util.rs                                    |  10 +-
 testdata/tpch/.gitignore                       |   0
 {testdata => tpch}/queries/q1.sql              |   0
 {testdata => tpch}/queries/q10.sql             |   0
 {testdata => tpch}/queries/q11.sql             |   0
 {testdata => tpch}/queries/q12.sql             |   0
 {testdata => tpch}/queries/q13.sql             |   0
 {testdata => tpch}/queries/q14.sql             |   0
 {testdata => tpch}/queries/q15.sql             |   0
 {testdata => tpch}/queries/q16.sql             |   0
 {testdata => tpch}/queries/q17.sql             |   0
 {testdata => tpch}/queries/q18.sql             |   0
 {testdata => tpch}/queries/q19.sql             |   0
 {testdata => tpch}/queries/q2.sql              |   0
 {testdata => tpch}/queries/q20.sql             |   0
 {testdata => tpch}/queries/q21.sql             |   0
 {testdata => tpch}/queries/q22.sql             |   0
 {testdata => tpch}/queries/q3.sql              |   0
 {testdata => tpch}/queries/q4.sql              |   0
 {testdata => tpch}/queries/q5.sql              |   0
 {testdata => tpch}/queries/q6.sql              |   0
 {testdata => tpch}/queries/q7.sql              |   0
 {testdata => tpch}/queries/q8.sql              |   0
 {testdata => tpch}/queries/q9.sql              |   0
 tpch/requirements.txt                          |   7 +-
 tpch/tpc.py                                    | 149 --------------
 tpch/tpcbench.py                               |  22 +-
 47 files changed, 355 insertions(+), 517 deletions(-)

diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
new file mode 100644
index 0000000..bcbe6e2
--- /dev/null
+++ b/.github/workflows/main.yml
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http:/www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: main
+on:
+  push:
+    branches: [main]
+  pull_request:
+    branches: [main]
+
+#concurrency:
+#  group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ 
github.workflow }}
+#cancel-in-progress: true
+
+jobs:
+  test-matrix:
+    runs-on: ubuntu-latest
+    strategy:
+      fail-fast: false
+      matrix:
+        python-version:
+          #- "3.10"
+          #- "3.11"
+          - "3.12"
+        toolchain:
+          - "stable"
+
+    steps:
+      - uses: actions/checkout@v4
+
+      - name: Setup Rust Toolchain
+        uses: dtolnay/rust-toolchain@stable
+        id: rust-toolchain
+        with:
+          components: clippy,rustfmt
+
+      - name: Install Protoc
+        uses: arduino/setup-protoc@v3
+        with:
+          version: "27.4"
+          repo-token: ${{ secrets.GITHUB_TOKEN }}
+
+      - name: Cache Cargo
+        uses: actions/cache@v4
+        with:
+          path: ~/.cargo
+          key: cargo-cache-${{ steps.rust-toolchain.outputs.cachekey }}-${{ 
hashFiles('Cargo.lock') }}
+
+      - name: Install dependencies and build
+        uses: astral-sh/setup-uv@v5
+        with:
+          python-version: ${{ matrix.python-version }}
+          enable-cache: true
+
+      - name: Create virtual env
+        run: |
+          uv venv
+
+      - name: Cache the generated dataset
+        id: cache-tpch-dataset
+        uses: actions/cache@v4
+        with:
+          path: ./testdata/tpch
+          key: tpch-data
+
+      - name: create the dataset
+        if: ${{ steps.cache-tpch-dataset.outputs.cache-hit != 'true' }}
+        run: |
+          uv add duckdb
+          uv run python tpch/make_data.py 1 testdata/tpch/
+
+      - name: build and install datafusion-ray
+        env:
+          RUST_BACKTRACE: 1
+        run: |
+          uv add 'ray[default]'
+          uv run --no-project maturin develop --uv
+
+      - name: validate tpch
+        env:
+          DATAFUSION_RAY_LOG_LEVEL: debug
+          RAY_COLOR_PREFIX: 1
+          RAY_DEDUP_LOGS: 0
+        run: |
+          uv run python tpch/tpcbench.py \
+            --data='file:///${{ github.workspace }}/testdata/tpch/' \
+            --concurrency 3 \
+            --partitions-per-worker 2 \
+            --batch-size=8192 \
+            --worker-pool-min=20 \
+            --validate
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
deleted file mode 100644
index 0c6a2b8..0000000
--- a/.github/workflows/rust.yml
+++ /dev/null
@@ -1,60 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http:/www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-name: Rust
-
-on:
-  push:
-  pull_request:
-
-env:
-  CARGO_TERM_COLOR: always
-  PYTHON_VERSION: 3.9
-  TPCH_SCALING_FACTOR: "1"
-  TPCH_TEST_PARTITIONS: "1"
-  TPCH_DATA_PATH: "data"
-
-jobs:
-  build:
-    runs-on: ubuntu-latest
-
-    steps:
-      - uses: actions/checkout@v4
-      - name: Install protobuf compiler
-        shell: bash
-        run: sudo apt-get install protobuf-compiler
-      - name: Build Rust code
-        run: cargo build --verbose
-      - name: Set up Python
-        uses: actions/setup-python@v5
-        with:
-          python-version: ${{ env.PYTHON_VERSION }}
-      - name: Install test dependencies
-        run: |
-          python -m pip install --upgrade pip
-          pip install -r tpch/requirements.txt
-#      - name: Generate test data
-#        run: |
-#          ./scripts/gen-test-data.sh
-      - name: Run Rust tests
-        run: cargo test --verbose
-#      - name: Run Python tests
-#        run: |
-#          python -m venv venv
-#          source venv/bin/activate
-#          pip install -r requirements-in.txt
-#          maturin develop
-#          python -m pytest
diff --git a/Cargo.toml b/Cargo.toml
index 0f37c09..0638fea 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,7 +22,7 @@ homepage = "https://github.com/apache/datafusion-ray";
 repository = "https://github.com/apache/datafusion-ray";
 authors = ["Apache DataFusion <[email protected]>"]
 version = "0.1.0"
-edition = "2021"
+edition = "2024"
 readme = "README.md"
 license = "Apache-2.0"
 rust-version = "1.85"
diff --git a/datafusion_ray/__init__.py b/datafusion_ray/__init__.py
index 365b56a..4dfe339 100644
--- a/datafusion_ray/__init__.py
+++ b/datafusion_ray/__init__.py
@@ -20,6 +20,10 @@ try:
 except ImportError:
     import importlib_metadata
 
-from .core import RayContext, exec_sql_on_tables, prettify, runtime_env, 
RayStagePool
+from .core import DFRayContext, df_ray_runtime_env
+
+from . import util
+
+__all__ = ["DFRayContext", "df_ray_runtime_env", "util"]
 
 __version__ = importlib_metadata.version(__name__)
diff --git a/datafusion_ray/core.py b/datafusion_ray/core.py
index 3e4cfe8..70955bc 100644
--- a/datafusion_ray/core.py
+++ b/datafusion_ray/core.py
@@ -23,15 +23,13 @@ import os
 import pyarrow as pa
 import asyncio
 import ray
-import json
 import time
 
 from .friendly import new_friendly_name
 
 from datafusion_ray._datafusion_ray_internal import (
-    RayContext as RayContextInternal,
-    RayDataFrame as RayDataFrameInternal,
-    exec_sql_on_tables,
+    DFRayContext as DFRayContextInternal,
+    DFRayDataFrame as DFRayDataFrameInternal,
     prettify,
 )
 
@@ -52,7 +50,7 @@ setup_logging()
 
 _log_level = os.environ.get("DATAFUSION_RAY_LOG_LEVEL", "ERROR").upper()
 _rust_backtrace = os.environ.get("RUST_BACKTRACE", "0")
-runtime_env = {
+df_ray_runtime_env = {
     "worker_process_setup_hook": setup_logging,
     "env_vars": {
         "DATAFUSION_RAY_LOG_LEVEL": _log_level,
@@ -88,7 +86,9 @@ async def wait_for(coros, name=""):
     # wrap the coro in a task to work with python 3.10 and 3.11+ where 
asyncio.wait semantics
     # changed to not accept any awaitable
     start = time.time()
-    done, _ = await asyncio.wait([asyncio.create_task(_ensure_coro(c)) for c 
in coros])
+    done, _ = await asyncio.wait(
+        [asyncio.create_task(_ensure_coro(c)) for c in coros]
+    )
     end = time.time()
     log.info(f"waiting for {name} took {end - start}s")
     for d in done:
@@ -101,61 +101,63 @@ async def wait_for(coros, name=""):
     return return_values
 
 
-class RayStagePool:
-    """A pool of RayStage actors that can be acquired and released"""
+class DFRayProcessorPool:
+    """A pool of DFRayProcessor actors that can be acquired and released"""
 
     # TODO: We can probably manage this set in a better way
-    # This is not a threadsafe implementation.
+    # This is not a threadsafe implementation, though the 
DFRayContextSupervisor accesses it
+    # from a single thread
+    #
     # This is simple though and will suffice for now
 
     def __init__(self, min_workers: int, max_workers: int):
         self.min_workers = min_workers
         self.max_workers = max_workers
 
-        # a map of stage_key (a random identifier) to stage actor reference
+        # a map of processor_key (a random identifier) to stage actor reference
         self.pool = {}
-        # a map of stage_key to listening address
+        # a map of processor_key to listening address
         self.addrs = {}
 
-        # holds object references from the start_up method for each stage
-        # we know all stages are listening when all of these refs have
+        # holds object references from the start_up method for each processor
+        # we know all processors are listening when all of these refs have
         # been waited on.  When they are ready we remove them from this set
-        self.stages_started = set()
+        self.processors_started = set()
 
-        # an event that is set when all stages are ready to serve
-        self.stages_ready = asyncio.Event()
+        # an event that is set when all processors are ready to serve
+        self.processors_ready = asyncio.Event()
 
-        # stages that are started but we need to get their address
+        # processors that are started but we need to get their address
         self.need_address = set()
 
-        # stages that we have the address for but need to start serving
+        # processors that we have the address for but need to start serving
         self.need_serving = set()
 
-        # stages in use
+        # processors in use
         self.acquired = set()
 
-        # stages available
+        # processors available
         self.available = set()
 
         for _ in range(min_workers):
-            self._new_stage()
+            self._new_processor()
 
         log.info(
-            f"created ray stage pool (min_workers: {min_workers}, max_workers: 
{max_workers})"
+            f"created ray processor pool (min_workers: {min_workers}, 
max_workers: {max_workers})"
         )
 
     async def start(self):
-        if not self.stages_ready.is_set():
-            await self._wait_for_stages_started()
+        if not self.processors_ready.is_set():
+            await self._wait_for_processors_started()
             await self._wait_for_get_addrs()
             await self._wait_for_serve()
-            self.stages_ready.set()
+            self.processors_ready.set()
 
     async def wait_for_ready(self):
-        await self.stages_ready.wait()
+        await self.processors_ready.wait()
 
     async def acquire(self, need=1):
-        stage_keys = []
+        processor_keys = []
 
         have = len(self.available)
         total = len(self.available) + len(self.acquired)
@@ -164,61 +166,67 @@ class RayStagePool:
         need_to_make = need - have
 
         if need_to_make > can_make:
-            raise Exception(f"Cannot allocate workers above 
{self.max_workers}")
+            raise Exception(
+                f"Cannot allocate workers above {self.max_workers}"
+            )
 
         if need_to_make > 0:
-            log.debug(f"creating {need_to_make} additional stages")
+            log.debug(f"creating {need_to_make} additional processors")
             for _ in range(need_to_make):
-                self._new_stage()
-            await wait_for([self.start()], "waiting for created stages")
+                self._new_processor()
+            await wait_for([self.start()], "waiting for created processors")
 
         assert len(self.available) >= need
 
         for _ in range(need):
-            stage_key = self.available.pop()
-            self.acquired.add(stage_key)
-
-            stage_keys.append(stage_key)
-
-        stages = [self.pool[sk] for sk in stage_keys]
-        addrs = [self.addrs[sk] for sk in stage_keys]
-        return (stages, stage_keys, addrs)
-
-    def release(self, stage_keys: list[str]):
-        for stage_key in stage_keys:
-            self.acquired.remove(stage_key)
-            self.available.add(stage_key)
-
-    def _new_stage(self):
-        self.stages_ready.clear()
-        stage_key = new_friendly_name()
-        log.debug(f"starting stage: {stage_key}")
-        stage = RayStage.options(name=f"Stage: {stage_key}").remote(stage_key)
-        self.pool[stage_key] = stage
-        self.stages_started.add(stage.start_up.remote())
-        self.available.add(stage_key)
-
-    async def _wait_for_stages_started(self):
-        log.info("waiting for stages to be ready")
-        started_keys = await wait_for(self.stages_started, "stages to be 
started")
-        # we need the addresses of these stages still
+            processor_key = self.available.pop()
+            self.acquired.add(processor_key)
+
+            processor_keys.append(processor_key)
+
+        processors = [self.pool[sk] for sk in processor_keys]
+        addrs = [self.addrs[sk] for sk in processor_keys]
+        return (processors, processor_keys, addrs)
+
+    def release(self, processor_keys: list[str]):
+        for processor_key in processor_keys:
+            self.acquired.remove(processor_key)
+            self.available.add(processor_key)
+
+    def _new_processor(self):
+        self.processors_ready.clear()
+        processor_key = new_friendly_name()
+        log.debug(f"starting processor: {processor_key}")
+        processor = DFRayProcessor.options(
+            name=f"Processor : {processor_key}"
+        ).remote(processor_key)
+        self.pool[processor_key] = processor
+        self.processors_started.add(processor.start_up.remote())
+        self.available.add(processor_key)
+
+    async def _wait_for_processors_started(self):
+        log.info("waiting for processors to be ready")
+        started_keys = await wait_for(
+            self.processors_started, "processors to be started"
+        )
+        # we need the addresses of these processors still
         self.need_address.update(set(started_keys))
-        # we've started all the stages we know about
-        self.stages_started = set()
-        log.info("stages are all listening")
+        # we've started all the processors we know about
+        self.processors_started = set()
+        log.info("processors are all listening")
 
     async def _wait_for_get_addrs(self):
         # get the addresses in a pipelined fashion
         refs = []
-        stage_keys = []
-        for stage_key in self.need_address:
-            stage = self.pool[stage_key]
-            refs.append(stage.addr.remote())
-            stage_keys.append(stage_key)
+        processor_keys = []
+        for processor_key in self.need_address:
+            processor = self.pool[processor_key]
+            refs.append(processor.addr.remote())
+            processor_keys.append(processor_key)
 
-            self.need_serving.add(stage_key)
+            self.need_serving.add(processor_key)
 
-        addrs = await wait_for(refs, "stage addresses")
+        addrs = await wait_for(refs, "processor addresses")
 
         for key, addr in addrs:
             self.addrs[key] = addr
@@ -226,45 +234,49 @@ class RayStagePool:
         self.need_address = set()
 
     async def _wait_for_serve(self):
-        log.info("running stages")
+        log.info("running processors")
         try:
-            for stage_key in self.need_serving:
-                log.info(f"starting serving of stage {stage_key}")
-                stage = self.pool[stage_key]
-                stage.serve.remote()
+            for processor_key in self.need_serving:
+                log.info(f"starting serving of processor {processor_key}")
+                processor = self.pool[processor_key]
+                processor.serve.remote()
             self.need_serving = set()
 
         except Exception as e:
-            log.error(f"StagePool: Uhandled Exception in serve: {e}")
+            log.error(f"ProcessorPool: Uhandled Exception in serve: {e}")
             raise e
 
     async def all_done(self):
-        log.info("calling stage all done")
-        refs = [stage.all_done.remote() for stage in self.pool.values()]
-        await wait_for(refs, "stages to be all done")
-        log.info("all stages shutdown")
+        log.info("calling processor all done")
+        refs = [
+            processor.all_done.remote() for processor in self.pool.values()
+        ]
+        await wait_for(refs, "processors to be all done")
+        log.info("all processors shutdown")
 
 
 @ray.remote(num_cpus=0)
-class RayStage:
-    def __init__(self, stage_key):
-        self.stage_key = stage_key
+class DFRayProcessor:
+    def __init__(self, processor_key):
+        self.processor_key = processor_key
 
         # import this here so ray doesn't try to serialize the rust extension
-        from datafusion_ray._datafusion_ray_internal import StageService
+        from datafusion_ray._datafusion_ray_internal import (
+            DFRayProcessorService,
+        )
 
-        self.stage_service = StageService(stage_key)
+        self.processor_service = DFRayProcessorService(processor_key)
 
     async def start_up(self):
         # this method is sync
-        self.stage_service.start_up()
-        return self.stage_key
+        self.processor_service.start_up()
+        return self.processor_key
 
     async def all_done(self):
-        await self.stage_service.all_done()
+        await self.processor_service.all_done()
 
     async def addr(self):
-        return (self.stage_key, self.stage_service.addr())
+        return (self.processor_key, self.processor_service.addr())
 
     async def update_plan(
         self,
@@ -273,7 +285,7 @@ class RayStage:
         partition_group: list[int],
         plan_bytes: bytes,
     ):
-        await self.stage_service.update_plan(
+        await self.processor_service.update_plan(
             stage_id,
             stage_addrs,
             partition_group,
@@ -281,9 +293,11 @@ class RayStage:
         )
 
     async def serve(self):
-        log.info(f"[{self.stage_key}] serving on {self.stage_service.addr()}")
-        await self.stage_service.serve()
-        log.info(f"[{self.stage_key}] done serving")
+        log.info(
+            f"[{self.processor_key}] serving on 
{self.processor_service.addr()}"
+        )
+        await self.processor_service.serve()
+        log.info(f"[{self.processor_key}] done serving")
 
 
 @dataclass
@@ -304,7 +318,7 @@ class InternalStageData:
     child_stage_ids: list[int]
     num_output_partitions: int
     full_partitions: bool
-    remote_stage: ...  # ray.actor.ActorHandle[RayStage]
+    remote_processor: ...  # ray.actor.ActorHandle[DFRayProcessor]
     remote_addr: str
 
     def __str__(self):
@@ -312,16 +326,18 @@ class InternalStageData:
 
 
 @ray.remote(num_cpus=0)
-class RayContextSupervisor:
+class DFRayContextSupervisor:
     def __init__(
         self,
         worker_pool_min: int,
         worker_pool_max: int,
     ) -> None:
-        log.info(f"Creating RayContextSupervisor worker_pool_min: 
{worker_pool_min}")
-        self.pool = RayStagePool(worker_pool_min, worker_pool_max)
+        log.info(
+            f"Creating DFRayContextSupervisor worker_pool_min: 
{worker_pool_min}"
+        )
+        self.pool = DFRayProcessorPool(worker_pool_min, worker_pool_max)
         self.stages: dict[str, InternalStageData] = {}
-        log.info("Created RayContextSupervisor")
+        log.info("Created DFRayContextSupervisor")
 
     async def start(self):
         await self.pool.start()
@@ -331,7 +347,9 @@ class RayContextSupervisor:
 
     async def get_stage_addrs(self, stage_id: int):
         addrs = [
-            sd.remote_addr for sd in self.stages.values() if sd.stage_id == 
stage_id
+            sd.remote_addr
+            for sd in self.stages.values()
+            if sd.stage_id == stage_id
         ]
         return addrs
 
@@ -342,23 +360,23 @@ class RayContextSupervisor:
         if len(self.stages) > 0:
             self.pool.release(list(self.stages.keys()))
 
-        remote_stages, remote_stage_keys, remote_addrs = await 
self.pool.acquire(
-            len(stage_datas)
+        remote_processors, remote_processor_keys, remote_addrs = (
+            await self.pool.acquire(len(stage_datas))
         )
         self.stages = {}
 
         for i, sd in enumerate(stage_datas):
-            remote_stage = remote_stages[i]
-            remote_stage_key = remote_stage_keys[i]
+            remote_processor = remote_processors[i]
+            remote_processor_key = remote_processor_keys[i]
             remote_addr = remote_addrs[i]
-            self.stages[remote_stage_key] = InternalStageData(
+            self.stages[remote_processor_key] = InternalStageData(
                 sd.stage_id,
                 sd.plan_bytes,
                 sd.partition_group,
                 sd.child_stage_ids,
                 sd.num_output_partitions,
                 sd.full_partitions,
-                remote_stage,
+                remote_processor,
                 remote_addr,
             )
 
@@ -379,9 +397,12 @@ class RayContextSupervisor:
             log.info(f"going to update plan for {stage_key}")
             kid = addrs_by_stage_key[stage_key]
             refs.append(
-                isd.remote_stage.update_plan.remote(
+                isd.remote_processor.update_plan.remote(
                     isd.stage_id,
-                    {stage_id: val["child_addrs"] for (stage_id, val) in 
kid.items()},
+                    {
+                        stage_id: val["child_addrs"]
+                        for (stage_id, val) in kid.items()
+                    },
                     isd.partition_group,
                     isd.plan_bytes,
                 )
@@ -413,7 +434,9 @@ class RayContextSupervisor:
                 ]
 
                 # sanity check
-                assert all([op == output_partitions[0] for op in 
output_partitions])
+                assert all(
+                    [op == output_partitions[0] for op in output_partitions]
+                )
                 output_partitions = output_partitions[0]
 
                 for child_stage_isd in child_stage_datas:
@@ -440,16 +463,16 @@ class RayContextSupervisor:
         await self.pool.all_done()
 
 
-class RayDataFrame:
+class DFRayDataFrame:
     def __init__(
         self,
-        ray_internal_df: RayDataFrameInternal,
-        supervisor,  # ray.actor.ActorHandle[RayContextSupervisor],
+        internal_df: DFRayDataFrameInternal,
+        supervisor,  # ray.actor.ActorHandle[DFRayContextSupervisor],
         batch_size=8192,
         partitions_per_worker: int | None = None,
         prefetch_buffer_size=0,
     ):
-        self.df = ray_internal_df
+        self.df = internal_df
         self.supervisor = supervisor
         self._stages = None
         self._batches = None
@@ -461,7 +484,9 @@ class RayDataFrame:
         # create our coordinator now, which we need to create stages
         if not self._stages:
             self._stages = self.df.stages(
-                self.batch_size, self.prefetch_buffer_size, 
self.partitions_per_worker
+                self.batch_size,
+                self.prefetch_buffer_size,
+                self.partitions_per_worker,
             )
 
         return self._stages
@@ -495,7 +520,9 @@ class RayDataFrame:
             )
             log.debug(f"last stage addrs {last_stage_addrs}")
 
-            reader = self.df.read_final_stage(last_stage_id, 
last_stage_addrs[0])
+            reader = self.df.read_final_stage(
+                last_stage_id, last_stage_addrs[0]
+            )
             log.debug("got reader")
             self._batches = list(reader)
         return self._batches
@@ -528,7 +555,7 @@ class RayDataFrame:
         call_sync(wait_for([ref], "creating ray stages"))
 
 
-class RayContext:
+class DFRayContext:
     def __init__(
         self,
         batch_size: int = 8192,
@@ -537,12 +564,12 @@ class RayContext:
         worker_pool_min: int = 1,
         worker_pool_max: int = 100,
     ) -> None:
-        self.ctx = RayContextInternal()
+        self.ctx = DFRayContextInternal()
         self.batch_size = batch_size
         self.partitions_per_worker = partitions_per_worker
         self.prefetch_buffer_size = prefetch_buffer_size
 
-        self.supervisor = RayContextSupervisor.options(
+        self.supervisor = DFRayContextSupervisor.options(
             name="RayContextSupersisor",
         ).remote(
             worker_pool_min,
@@ -564,14 +591,16 @@ class RayContext:
     def register_parquet(self, name: str, path: str):
         self.ctx.register_parquet(name, path)
 
-    def register_listing_table(self, name: str, path: str, 
file_extention="parquet"):
+    def register_listing_table(
+        self, name: str, path: str, file_extention="parquet"
+    ):
         self.ctx.register_listing_table(name, path, file_extention)
 
-    def sql(self, query: str) -> RayDataFrame:
+    def sql(self, query: str) -> DFRayDataFrame:
 
         df = self.ctx.sql(query)
 
-        return RayDataFrame(
+        return DFRayDataFrame(
             df,
             self.supervisor,
             self.batch_size,
@@ -583,8 +612,6 @@ class RayContext:
         self.ctx.set(option, value)
 
     def __del__(self):
-        log.info("RayContext, cleaning up remote resources")
+        log.info("DFRayContext, cleaning up remote resources")
         ref = self.supervisor.all_done.remote()
-        call_sync(wait_for([ref], "RayContextSupervisor all done"))
-
-    # log.debug("all stage addrs set? or should be")
+        call_sync(wait_for([ref], "DFRayContextSupervisor all done"))
diff --git a/datafusion_ray/util.py b/datafusion_ray/util.py
new file mode 100644
index 0000000..d3fecf8
--- /dev/null
+++ b/datafusion_ray/util.py
@@ -0,0 +1,4 @@
+from datafusion_ray._datafusion_ray_internal import (
+    exec_sql_on_tables,
+    prettify,
+)
diff --git a/examples/ray_stage.py b/examples/ray_stage.py
deleted file mode 100644
index 72e983e..0000000
--- a/examples/ray_stage.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import argparse
-import time
-import os
-import ray
-from datafusion_ray import RayContext
-
-
-def go(data_dir: str, concurrency: int, isolate: bool):
-    print(f"isolate {isolate}")
-    ctx = RayContext(
-        isolate_partitions=isolate, bucket="rob-tandy-tmp", batch_size=1024
-    )
-    ctx.set("datafusion.execution.target_partitions", str(concurrency))
-    ctx.set("datafusion.catalog.information_schema", "true")
-    ctx.set("datafusion.optimizer.enable_round_robin_repartition", "false")
-    ctx.set("datafusion.execution.coalesce_batches", "false")
-
-    for table in [
-        "customer",
-        "orders",
-    ]:
-        f = os.path.join(data_dir, f"{table}.parquet")
-        print("Registering table", table, "using path", f)
-        ctx.register_parquet(table, f)
-
-    query = """SELECT customer.c_name, sum(orders.o_totalprice) as total_amount
-    FROM customer JOIN orders ON customer.c_custkey = orders.o_custkey
-    GROUP BY customer.c_name order by total_amount desc limit 10"""
-
-    # query = """SELECT count(customer.c_name), customer.c_mktsegment from 
customer group by customer.c_mktsegment limit 10"""
-
-    df = ctx.sql(query)
-    for stage in df.stages():
-        print("Stage ", stage.stage_id)
-        print(stage.execution_plan().display_indent())
-
-    df.show()
-
-    time.sleep(3)
-
-
-if __name__ == "__main__":
-    ray.init(namespace="example")
-    parser = argparse.ArgumentParser()
-    parser.add_argument("--data", required=True, help="path to tpch*.parquet 
files")
-    parser.add_argument("--concurrency", required=True, type=int)
-    parser.add_argument(
-        "--isolate",
-        action="store_true",
-        help="do each partition as a separate ray actor, more concurrency",
-    )
-    args = parser.parse_args()
-
-    go(args.data, args.concurrency, args.isolate)
diff --git a/examples/tips.py b/examples/tips.py
index 2df233b..7d72ba5 100644
--- a/examples/tips.py
+++ b/examples/tips.py
@@ -19,11 +19,11 @@ import argparse
 import datafusion
 import ray
 
-from datafusion_ray import RayContext
+from datafusion_ray import DFRayContext
 
 
 def go(data_dir: str):
-    ctx = RayContext()
+    ctx = DFRayContext()
     # we could set this value to however many CPUs we plan to give each
     # ray task
     ctx.set("datafusion.execution.target_partitions", "1")
diff --git a/pyproject.toml b/pyproject.toml
index b81483b..32aa6e3 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -21,14 +21,14 @@ build-backend = "maturin"
 
 [project]
 name = "datafusion-ray"
-requires-python = ">=3.7"
+requires-python = ">=3.10"
 classifiers = [
   "Programming Language :: Rust",
   "Programming Language :: Python :: Implementation :: CPython",
   "Programming Language :: Python :: Implementation :: PyPy",
 ]
 dependencies = [
-  "datafusion>=43.0.0",
+  "datafusion>=45.0.0",
   "pyarrow>=18.0.0",
   "typing-extensions;python_version<'3.13'",
 ]
diff --git a/requirements-in.txt b/requirements-in.txt
deleted file mode 100644
index 4812e1b..0000000
--- a/requirements-in.txt
+++ /dev/null
@@ -1,12 +0,0 @@
-black
-flake8
-isort
-maturin
-mypy
-numpy
-pyarrow>=18.0.0
-pytest
-ray==2.40.0
-datafusion==43.1.0
-toml
-importlib_metadata; python_version < "3.8"
diff --git a/src/codec.rs b/src/codec.rs
index ade208d..b817e13 100644
--- a/src/codec.rs
+++ b/src/codec.rs
@@ -5,7 +5,7 @@ use crate::{
     max_rows::MaxRowsExec,
     pre_fetch::PrefetchExec,
     protobuf::{
-        MaxRowsExecNode, PartitionIsolatorExecNode, PrefetchExecNode, 
RayStageReaderExecNode,
+        DfRayStageReaderExecNode, MaxRowsExecNode, PartitionIsolatorExecNode, 
PrefetchExecNode,
     },
 };
 
@@ -24,7 +24,7 @@ use datafusion_proto::protobuf;
 
 use prost::Message;
 
-use crate::ray_stage_reader::RayStageReaderExec;
+use crate::stage_reader::DFRayStageReaderExec;
 
 #[derive(Debug)]
 /// Physical Extension Codec for for DataFusion Ray plans
@@ -49,7 +49,7 @@ impl PhysicalExtensionCodec for RayCodec {
                     node.partition_count as usize,
                 )))
             }
-        } else if let Ok(node) = RayStageReaderExecNode::decode(buf) {
+        } else if let Ok(node) = DfRayStageReaderExecNode::decode(buf) {
             let schema: Schema = node
                 .schema
                 .as_ref()
@@ -64,7 +64,7 @@ impl PhysicalExtensionCodec for RayCodec {
             )?
             .ok_or(internal_datafusion_err!("missing partitioning in proto"))?;
 
-            Ok(Arc::new(RayStageReaderExec::try_new(
+            Ok(Arc::new(DFRayStageReaderExec::try_new(
                 part,
                 Arc::new(schema),
                 node.stage_id as usize,
@@ -99,14 +99,14 @@ impl PhysicalExtensionCodec for RayCodec {
     }
 
     fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> 
Result<()> {
-        if let Some(reader) = 
node.as_any().downcast_ref::<RayStageReaderExec>() {
+        if let Some(reader) = 
node.as_any().downcast_ref::<DFRayStageReaderExec>() {
             let schema: protobuf::Schema = reader.schema().try_into()?;
             let partitioning: protobuf::Partitioning = serialize_partitioning(
                 reader.properties().output_partitioning(),
                 &DefaultPhysicalExtensionCodec {},
             )?;
 
-            let pb = RayStageReaderExecNode {
+            let pb = DfRayStageReaderExecNode {
                 schema: Some(schema),
                 partitioning: Some(partitioning),
                 stage_id: reader.stage_id as u64,
@@ -151,7 +151,7 @@ impl PhysicalExtensionCodec for RayCodec {
 #[cfg(test)]
 mod test {
     use super::*;
-    use crate::ray_stage_reader::RayStageReaderExec;
+    use crate::stage_reader::DFRayStageReaderExec;
     use arrow::datatypes::DataType;
     use datafusion::{
         physical_plan::{display::DisplayableExecutionPlan, displayable, 
Partitioning},
@@ -169,7 +169,7 @@ mod test {
         ]));
         let ctx = SessionContext::new();
         let part = Partitioning::UnknownPartitioning(2);
-        let exec = Arc::new(RayStageReaderExec::try_new(part, schema, 
1).unwrap());
+        let exec = Arc::new(DFRayStageReaderExec::try_new(part, schema, 
1).unwrap());
         let codec = RayCodec {};
         let mut buf = vec![];
         codec.try_encode(exec.clone(), &mut buf).unwrap();
@@ -185,7 +185,7 @@ mod test {
         let ctx = SessionContext::new();
         let part = Partitioning::UnknownPartitioning(2);
         let exec = Arc::new(MaxRowsExec::new(
-            Arc::new(RayStageReaderExec::try_new(part, schema, 1).unwrap()),
+            Arc::new(DFRayStageReaderExec::try_new(part, schema, 1).unwrap()),
             10,
         ));
         let codec = RayCodec {};
diff --git a/src/context.rs b/src/context.rs
index d99962c..191d632 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -23,20 +23,20 @@ use object_store::aws::AmazonS3Builder;
 use pyo3::prelude::*;
 use std::sync::Arc;
 
-use crate::dataframe::RayDataFrame;
+use crate::dataframe::DFRayDataFrame;
 use crate::physical::RayStageOptimizerRule;
 use crate::util::ResultExt;
 use url::Url;
 
-/// Internal Session Context object for the python class RayContext
+/// Internal Session Context object for the python class DFRayContext
 #[pyclass]
-pub struct RayContext {
+pub struct DFRayContext {
     /// our datafusion context
     ctx: SessionContext,
 }
 
 #[pymethods]
-impl RayContext {
+impl DFRayContext {
     #[new]
     pub fn new() -> PyResult<Self> {
         let rule = RayStageOptimizerRule::new();
@@ -93,10 +93,10 @@ impl RayContext {
         .to_py_err()
     }
 
-    pub fn sql(&self, py: Python, query: String) -> PyResult<RayDataFrame> {
+    pub fn sql(&self, py: Python, query: String) -> PyResult<DFRayDataFrame> {
         let df = wait_for_future(py, self.ctx.sql(&query))?;
 
-        Ok(RayDataFrame::new(df))
+        Ok(DFRayDataFrame::new(df))
     }
 
     pub fn set(&self, option: String, value: String) -> PyResult<()> {
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 56e76fb..adecf6b 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -48,14 +48,14 @@ use tokio::sync::Mutex;
 use crate::isolator::PartitionIsolatorExec;
 use crate::max_rows::MaxRowsExec;
 use crate::pre_fetch::PrefetchExec;
-use crate::ray_stage::RayStageExec;
-use crate::ray_stage_reader::RayStageReaderExec;
+use crate::stage::DFRayStageExec;
+use crate::stage_reader::DFRayStageReaderExec;
 use crate::util::collect_from_stage;
 use crate::util::display_plan_with_partition_counts;
 use crate::util::physical_plan_to_bytes;
 use crate::util::ResultExt;
 
-/// Internal rust class beyind the RayDataFrame python object
+/// Internal rust class beyind the DFRayDataFrame python object
 ///
 /// It is a container for a plan for a query, as we would expect.
 ///
@@ -67,7 +67,7 @@ use crate::util::ResultExt;
 /// The second role of this object is to be able to fetch record batches from 
the final_
 /// stage in the plan and return them to python.
 #[pyclass]
-pub struct RayDataFrame {
+pub struct DFRayDataFrame {
     /// holds the logical plan of the query we will execute
     df: DataFrame,
     /// the physical plan we will use to consume the final stage.
@@ -75,7 +75,7 @@ pub struct RayDataFrame {
     final_plan: Option<Arc<dyn ExecutionPlan>>,
 }
 
-impl RayDataFrame {
+impl DFRayDataFrame {
     pub fn new(df: DataFrame) -> Self {
         Self {
             df,
@@ -85,7 +85,7 @@ impl RayDataFrame {
 }
 
 #[pymethods]
-impl RayDataFrame {
+impl DFRayDataFrame {
     #[pyo3(signature = (batch_size, prefetch_buffer_size, 
partitions_per_worker=None))]
     fn stages(
         &mut self,
@@ -93,7 +93,7 @@ impl RayDataFrame {
         batch_size: usize,
         prefetch_buffer_size: usize,
         partitions_per_worker: Option<usize>,
-    ) -> PyResult<Vec<PyDataFrameStage>> {
+    ) -> PyResult<Vec<PyDFRayStage>> {
         let mut stages = vec![];
 
         // TODO: This can be done more efficiently, likely in one pass but I'm
@@ -109,7 +109,7 @@ impl RayDataFrame {
                 display_plan_with_partition_counts(&plan)
             );
 
-            if let Some(stage_exec) = 
plan.as_any().downcast_ref::<RayStageExec>() {
+            if let Some(stage_exec) = 
plan.as_any().downcast_ref::<DFRayStageExec>() {
                 let input = plan.children();
                 assert!(input.len() == 1, "RayStageExec must have exactly one 
child");
                 let input = input[0];
@@ -120,7 +120,7 @@ impl RayDataFrame {
                     plan.output_partitioning().partition_count()
                 );
 
-                let replacement = Arc::new(RayStageReaderExec::try_new(
+                let replacement = Arc::new(DFRayStageReaderExec::try_new(
                     plan.output_partitioning().clone(),
                     input.schema(),
                     stage_exec.stage_id,
@@ -145,7 +145,7 @@ impl RayDataFrame {
                 displayable(plan.as_ref()).one_line()
             );
 
-            if let Some(stage_exec) = 
plan.as_any().downcast_ref::<RayStageExec>() {
+            if let Some(stage_exec) = 
plan.as_any().downcast_ref::<DFRayStageExec>() {
                 trace!("ray stage exec");
                 let input = plan.children();
                 assert!(input.len() == 1, "RayStageExec must have exactly one 
child");
@@ -153,7 +153,7 @@ impl RayDataFrame {
 
                 let fixed_plan = input.clone().transform_down(down)?.data;
 
-                let stage = PyDataFrameStage::new(
+                let stage = PyDFRayStage::new(
                     stage_exec.stage_id,
                     fixed_plan,
                     partition_groups.clone(),
@@ -231,7 +231,7 @@ impl RayDataFrame {
             return internal_err!("Last stage expected to have one 
partition").to_py_err();
         }
 
-        last_stage = PyDataFrameStage::new(
+        last_stage = PyDFRayStage::new(
             last_stage.stage_id,
             Arc::new(MaxRowsExec::new(
                 Arc::new(CoalesceBatchesExec::new(last_stage.plan, batch_size))
@@ -244,7 +244,7 @@ impl RayDataFrame {
 
         // done fixing last stage
 
-        let reader_plan = Arc::new(RayStageReaderExec::try_new_from_input(
+        let reader_plan = Arc::new(DFRayStageReaderExec::try_new_from_input(
             last_stage.plan.clone(),
             last_stage.stage_id,
         )?) as Arc<dyn ExecutionPlan>;
@@ -298,6 +298,7 @@ impl RayDataFrame {
     }
 }
 
+#[allow(clippy::type_complexity)]
 fn build_replacement(
     plan: Arc<dyn ExecutionPlan>,
     prefetch_buffer_size: usize,
@@ -351,7 +352,7 @@ fn build_replacement(
 
 /// A Python class to hold a PHysical plan of a single stage
 #[pyclass]
-pub struct PyDataFrameStage {
+pub struct PyDFRayStage {
     /// our stage id
     stage_id: usize,
     /// the physical plan of our stage
@@ -364,7 +365,7 @@ pub struct PyDataFrameStage {
     /// CombinedRecordBatchStream
     full_partitions: bool,
 }
-impl PyDataFrameStage {
+impl PyDFRayStage {
     fn new(
         stage_id: usize,
         plan: Arc<dyn ExecutionPlan>,
@@ -381,7 +382,7 @@ impl PyDataFrameStage {
 }
 
 #[pymethods]
-impl PyDataFrameStage {
+impl PyDFRayStage {
     #[getter]
     fn stage_id(&self) -> usize {
         self.stage_id
@@ -410,7 +411,7 @@ impl PyDataFrameStage {
         self.plan
             .clone()
             .transform_down(|node: Arc<dyn ExecutionPlan>| {
-                if let Some(reader) = 
node.as_any().downcast_ref::<RayStageReaderExec>() {
+                if let Some(reader) = 
node.as_any().downcast_ref::<DFRayStageReaderExec>() {
                     result.push(reader.stage_id);
                 }
                 Ok(Transformed::no(node))
diff --git a/src/lib.rs b/src/lib.rs
index 0a7e04f..5158f5c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -31,18 +31,18 @@ pub mod isolator;
 pub mod max_rows;
 pub mod physical;
 pub mod pre_fetch;
-pub mod ray_stage;
-pub mod ray_stage_reader;
-pub mod stage_service;
+pub mod processor_service;
+pub mod stage;
+pub mod stage_reader;
 pub mod util;
 
 #[pymodule]
 fn _datafusion_ray_internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
     setup_logging();
-    m.add_class::<context::RayContext>()?;
-    m.add_class::<dataframe::RayDataFrame>()?;
-    m.add_class::<dataframe::PyDataFrameStage>()?;
-    m.add_class::<stage_service::StageService>()?;
+    m.add_class::<context::DFRayContext>()?;
+    m.add_class::<dataframe::DFRayDataFrame>()?;
+    m.add_class::<dataframe::PyDFRayStage>()?;
+    m.add_class::<processor_service::DFRayProcessorService>()?;
     m.add_function(wrap_pyfunction!(util::prettify, m)?)?;
     m.add_function(wrap_pyfunction!(util::exec_sql_on_tables, m)?)?;
     Ok(())
diff --git a/src/physical.rs b/src/physical.rs
index 428ae94..9e05ad1 100644
--- a/src/physical.rs
+++ b/src/physical.rs
@@ -25,7 +25,7 @@ use datafusion::physical_plan::ExecutionPlan;
 use log::debug;
 use std::sync::Arc;
 
-use crate::ray_stage::RayStageExec;
+use crate::stage::DFRayStageExec;
 use crate::util::display_plan_with_partition_counts;
 
 /// This optimizer rule walks up the physical plan tree
@@ -70,7 +70,7 @@ impl PhysicalOptimizerRule for RayStageOptimizerRule {
                 || plan.as_any().downcast_ref::<SortExec>().is_some()
                 || plan.as_any().downcast_ref::<NestedLoopJoinExec>().is_some()
             {
-                let stage = Arc::new(RayStageExec::new(plan, stage_counter));
+                let stage = Arc::new(DFRayStageExec::new(plan, stage_counter));
                 stage_counter += 1;
                 Ok(Transformed::yes(stage as Arc<dyn ExecutionPlan>))
             } else {
@@ -79,7 +79,8 @@ impl PhysicalOptimizerRule for RayStageOptimizerRule {
         };
 
         let plan = plan.transform_up(up)?.data;
-        let final_plan = Arc::new(RayStageExec::new(plan, stage_counter)) as 
Arc<dyn ExecutionPlan>;
+        let final_plan =
+            Arc::new(DFRayStageExec::new(plan, stage_counter)) as Arc<dyn 
ExecutionPlan>;
 
         debug!(
             "optimized physical plan:\n{}",
diff --git a/src/stage_service.rs b/src/processor_service.rs
similarity index 93%
rename from src/stage_service.rs
rename to src/processor_service.rs
index 949bae2..5164577 100644
--- a/src/stage_service.rs
+++ b/src/processor_service.rs
@@ -50,8 +50,8 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
 use crate::flight::{FlightHandler, FlightServ};
 use crate::isolator::PartitionGroup;
 use crate::util::{
-    bytes_to_physical_plan, display_plan_with_partition_counts, extract_ticket,
-    input_stage_ids, make_client, ResultExt,
+    bytes_to_physical_plan, display_plan_with_partition_counts, 
extract_ticket, input_stage_ids,
+    make_client, ResultExt,
 };
 
 /// a map of stage_id, partition to a list FlightClients that can serve
@@ -59,23 +59,23 @@ use crate::util::{
 /// will consume the partition from all clients and merge the results.
 pub(crate) struct ServiceClients(pub HashMap<(usize, usize), 
Mutex<Vec<FlightClient>>>);
 
-/// StageHandler is a [`FlightHandler`] that serves streams of partitions from 
a hosted Physical Plan
+/// DFRayProcessorHandler is a [`FlightHandler`] that serves streams of 
partitions from a hosted Physical Plan
 /// It only responds to the DoGet Arrow Flight method.
-struct StageHandler {
+struct DFRayProcessorHandler {
     /// our name, useful for logging
     name: String,
     /// Inner state of the handler
-    inner: RwLock<Option<StageHandlerInner>>,
+    inner: RwLock<Option<DFRayProcessorHandlerInner>>,
 }
 
-struct StageHandlerInner {
+struct DFRayProcessorHandlerInner {
     /// the physical plan that comprises our stage
     pub(crate) plan: Arc<dyn ExecutionPlan>,
     /// the session context we will use to execute the plan
     pub(crate) ctx: SessionContext,
 }
 
-impl StageHandler {
+impl DFRayProcessorHandler {
     pub fn new(name: String) -> Self {
         let inner = RwLock::new(None);
 
@@ -88,13 +88,14 @@ impl StageHandler {
         plan: Arc<dyn ExecutionPlan>,
         partition_group: Vec<usize>,
     ) -> DFResult<()> {
-        let inner = StageHandlerInner::new(stage_id, stage_addrs, plan, 
partition_group).await?;
+        let inner =
+            DFRayProcessorHandlerInner::new(stage_id, stage_addrs, plan, 
partition_group).await?;
         self.inner.write().replace(inner);
         Ok(())
     }
 }
 
-impl StageHandlerInner {
+impl DFRayProcessorHandlerInner {
     pub async fn new(
         stage_id: usize,
         stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
@@ -169,7 +170,7 @@ impl StageHandlerInner {
 }
 
 fn make_stream(
-    inner: &StageHandlerInner,
+    inner: &DFRayProcessorHandlerInner,
     partition: usize,
 ) -> Result<impl Stream<Item = Result<RecordBatch, FlightError>> + Send + 
'static, Status> {
     let task_ctx = inner.ctx.task_ctx();
@@ -190,7 +191,7 @@ fn make_stream(
 }
 
 #[async_trait]
-impl FlightHandler for StageHandler {
+impl FlightHandler for DFRayProcessorHandler {
     async fn get_stream(
         &self,
         request: Request<Ticket>,
@@ -234,22 +235,22 @@ impl FlightHandler for StageHandler {
     }
 }
 
-/// StageService is a Arrow Flight service that serves streams of
+/// DFRayProcessorService is a Arrow Flight service that serves streams of
 /// partitions from a hosted Physical Plan
 ///
 /// It only responds to the DoGet Arrow Flight method
 #[pyclass]
-pub struct StageService {
+pub struct DFRayProcessorService {
     name: String,
     listener: Option<TcpListener>,
-    handler: Arc<StageHandler>,
+    handler: Arc<DFRayProcessorHandler>,
     addr: Option<String>,
     all_done_tx: Arc<Mutex<Sender<()>>>,
     all_done_rx: Option<Receiver<()>>,
 }
 
 #[pymethods]
-impl StageService {
+impl DFRayProcessorService {
     #[new]
     pub fn new(name: String) -> PyResult<Self> {
         let name = format!("[{}]", name);
@@ -259,7 +260,7 @@ impl StageService {
         let (all_done_tx, all_done_rx) = channel(1);
         let all_done_tx = Arc::new(Mutex::new(all_done_tx));
 
-        let handler = Arc::new(StageHandler::new(name.clone()));
+        let handler = Arc::new(DFRayProcessorHandler::new(name.clone()));
 
         Ok(Self {
             name,
@@ -313,7 +314,7 @@ impl StageService {
     }
 
     /// replace the plan that this service was providing, we will do this when 
we want
-    /// to reuse the StageService for a subsequent query
+    /// to reuse the DFRayProcessorService for a subsequent query
     ///
     /// returns a python coroutine that should be awaited
     pub fn update_plan<'a>(
diff --git a/src/proto/datafusion_ray.proto b/src/proto/datafusion_ray.proto
index 75d3ab1..5516b32 100644
--- a/src/proto/datafusion_ray.proto
+++ b/src/proto/datafusion_ray.proto
@@ -9,7 +9,7 @@ option java_outer_classname = "RayDataFusionProto";
 import "datafusion_common.proto";
 import "datafusion.proto";
 
-message RayStageReaderExecNode {
+message DFRayStageReaderExecNode {
   // schema of the stage we will consume
   datafusion_common.Schema schema = 1;
   // properties of the stage we will consume
diff --git a/src/proto/generated/protobuf.rs b/src/proto/generated/protobuf.rs
index 212a366..e5ffe9e 100644
--- a/src/proto/generated/protobuf.rs
+++ b/src/proto/generated/protobuf.rs
@@ -1,6 +1,6 @@
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct RayStageReaderExecNode {
+pub struct DfRayStageReaderExecNode {
     /// schema of the stage we will consume
     #[prost(message, optional, tag = "1")]
     pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
diff --git a/src/ray_stage.rs b/src/stage.rs
similarity index 97%
rename from src/ray_stage.rs
rename to src/stage.rs
index f7a0649..e1b139d 100644
--- a/src/ray_stage.rs
+++ b/src/stage.rs
@@ -79,7 +79,7 @@ use datafusion::{arrow::datatypes::SchemaRef, 
execution::SendableRecordBatchStre
 ///
 /// See [`crate::isolator::PartitionIsolatorExec`] for more information on how 
the shadow partitions work
 #[derive(Debug)]
-pub struct RayStageExec {
+pub struct DFRayStageExec {
     /// Input plan
     pub(crate) input: Arc<dyn ExecutionPlan>,
     /// Output partitioning
@@ -87,7 +87,7 @@ pub struct RayStageExec {
     pub stage_id: usize,
 }
 
-impl RayStageExec {
+impl DFRayStageExec {
     pub fn new(input: Arc<dyn ExecutionPlan>, stage_id: usize) -> Self {
         let properties = input.properties().clone();
 
@@ -110,7 +110,7 @@ impl RayStageExec {
         }
     }
 }
-impl DisplayAs for RayStageExec {
+impl DisplayAs for DFRayStageExec {
     fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
         write!(
             f,
@@ -121,7 +121,7 @@ impl DisplayAs for RayStageExec {
     }
 }
 
-impl ExecutionPlan for RayStageExec {
+impl ExecutionPlan for DFRayStageExec {
     fn schema(&self) -> SchemaRef {
         self.input.schema()
     }
@@ -152,7 +152,7 @@ impl ExecutionPlan for RayStageExec {
         // as the plan tree is rearranged we want to remember the original 
partitioning that we
         // had, even if we get new inputs.   This is because 
RayStageReaderExecs, when created by
         // the RayDataFrame will need to know the original partitioning
-        Ok(Arc::new(RayStageExec::new_with_properties(
+        Ok(Arc::new(DFRayStageExec::new_with_properties(
             child,
             self.stage_id,
             self.properties.clone(),
diff --git a/src/ray_stage_reader.rs b/src/stage_reader.rs
similarity index 96%
rename from src/ray_stage_reader.rs
rename to src/stage_reader.rs
index c64a154..0916be4 100644
--- a/src/ray_stage_reader.rs
+++ b/src/stage_reader.rs
@@ -15,8 +15,8 @@ use futures::StreamExt;
 use log::trace;
 use prost::Message;
 
+use crate::processor_service::ServiceClients;
 use crate::protobuf::FlightTicketData;
-use crate::stage_service::ServiceClients;
 use crate::util::CombinedRecordBatchStream;
 
 /// An [`ExecutionPlan`] that will produce a stream of batches fetched from 
another stage
@@ -25,13 +25,13 @@ use crate::util::CombinedRecordBatchStream;
 /// Note that discovery of the service is handled by populating an instance of 
[`crate::stage_service::ServiceClients`]
 /// and storing it as an extension in the 
[`datafusion::execution::TaskContext`] configuration.
 #[derive(Debug)]
-pub struct RayStageReaderExec {
+pub struct DFRayStageReaderExec {
     properties: PlanProperties,
     schema: SchemaRef,
     pub stage_id: usize,
 }
 
-impl RayStageReaderExec {
+impl DFRayStageReaderExec {
     pub fn try_new_from_input(input: Arc<dyn ExecutionPlan>, stage_id: usize) 
-> Result<Self> {
         let properties = input.properties().clone();
 
@@ -53,7 +53,7 @@ impl RayStageReaderExec {
         })
     }
 }
-impl DisplayAs for RayStageReaderExec {
+impl DisplayAs for DFRayStageReaderExec {
     fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
         write!(
             f,
@@ -64,7 +64,7 @@ impl DisplayAs for RayStageReaderExec {
     }
 }
 
-impl ExecutionPlan for RayStageReaderExec {
+impl ExecutionPlan for DFRayStageReaderExec {
     fn schema(&self) -> SchemaRef {
         self.schema.clone()
     }
diff --git a/src/util.rs b/src/util.rs
index 941c6e4..0c07c5c 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -38,9 +38,9 @@ use pyo3::types::{PyBytes, PyList};
 use tonic::transport::Channel;
 
 use crate::codec::RayCodec;
+use crate::processor_service::ServiceClients;
 use crate::protobuf::FlightTicketData;
-use crate::ray_stage_reader::RayStageReaderExec;
-use crate::stage_service::ServiceClients;
+use crate::stage_reader::DFRayStageReaderExec;
 use prost::Message;
 use tokio::macros::support::thread_rng_n;
 
@@ -218,7 +218,7 @@ pub fn input_stage_ids(plan: &Arc<dyn ExecutionPlan>) -> 
Result<Vec<usize>, Data
     let mut result = vec![];
     plan.clone()
         .transform_down(|node: Arc<dyn ExecutionPlan>| {
-            if let Some(reader) = 
node.as_any().downcast_ref::<RayStageReaderExec>() {
+            if let Some(reader) = 
node.as_any().downcast_ref::<DFRayStageReaderExec>() {
                 result.push(reader.stage_id);
             }
             Ok(Transformed::no(node))
@@ -486,7 +486,7 @@ mod test {
         // test with file
         let tables = vec![(
             "people".to_string(),
-            format!("file://{}", file.path().to_str().unwrap().to_string()),
+            format!("file://{}", file.path().to_str().unwrap()),
         )];
         let query = "SELECT * FROM people ORDER BY age".to_string();
         let res = exec_sql(query.clone(), tables).await.unwrap();
@@ -501,7 +501,7 @@ mod test {
         // test with dir
         let tables = vec![(
             "people".to_string(),
-            format!("file://{}/", dir.path().to_str().unwrap().to_string()),
+            format!("file://{}/", dir.path().to_str().unwrap()),
         )];
         let res = exec_sql(query, tables).await.unwrap();
         assert_eq!(
diff --git a/testdata/tpch/.gitignore b/testdata/tpch/.gitignore
new file mode 100644
index 0000000..e69de29
diff --git a/testdata/queries/q1.sql b/tpch/queries/q1.sql
similarity index 100%
rename from testdata/queries/q1.sql
rename to tpch/queries/q1.sql
diff --git a/testdata/queries/q10.sql b/tpch/queries/q10.sql
similarity index 100%
rename from testdata/queries/q10.sql
rename to tpch/queries/q10.sql
diff --git a/testdata/queries/q11.sql b/tpch/queries/q11.sql
similarity index 100%
rename from testdata/queries/q11.sql
rename to tpch/queries/q11.sql
diff --git a/testdata/queries/q12.sql b/tpch/queries/q12.sql
similarity index 100%
rename from testdata/queries/q12.sql
rename to tpch/queries/q12.sql
diff --git a/testdata/queries/q13.sql b/tpch/queries/q13.sql
similarity index 100%
rename from testdata/queries/q13.sql
rename to tpch/queries/q13.sql
diff --git a/testdata/queries/q14.sql b/tpch/queries/q14.sql
similarity index 100%
rename from testdata/queries/q14.sql
rename to tpch/queries/q14.sql
diff --git a/testdata/queries/q15.sql b/tpch/queries/q15.sql
similarity index 100%
rename from testdata/queries/q15.sql
rename to tpch/queries/q15.sql
diff --git a/testdata/queries/q16.sql b/tpch/queries/q16.sql
similarity index 100%
rename from testdata/queries/q16.sql
rename to tpch/queries/q16.sql
diff --git a/testdata/queries/q17.sql b/tpch/queries/q17.sql
similarity index 100%
rename from testdata/queries/q17.sql
rename to tpch/queries/q17.sql
diff --git a/testdata/queries/q18.sql b/tpch/queries/q18.sql
similarity index 100%
rename from testdata/queries/q18.sql
rename to tpch/queries/q18.sql
diff --git a/testdata/queries/q19.sql b/tpch/queries/q19.sql
similarity index 100%
rename from testdata/queries/q19.sql
rename to tpch/queries/q19.sql
diff --git a/testdata/queries/q2.sql b/tpch/queries/q2.sql
similarity index 100%
rename from testdata/queries/q2.sql
rename to tpch/queries/q2.sql
diff --git a/testdata/queries/q20.sql b/tpch/queries/q20.sql
similarity index 100%
rename from testdata/queries/q20.sql
rename to tpch/queries/q20.sql
diff --git a/testdata/queries/q21.sql b/tpch/queries/q21.sql
similarity index 100%
rename from testdata/queries/q21.sql
rename to tpch/queries/q21.sql
diff --git a/testdata/queries/q22.sql b/tpch/queries/q22.sql
similarity index 100%
rename from testdata/queries/q22.sql
rename to tpch/queries/q22.sql
diff --git a/testdata/queries/q3.sql b/tpch/queries/q3.sql
similarity index 100%
rename from testdata/queries/q3.sql
rename to tpch/queries/q3.sql
diff --git a/testdata/queries/q4.sql b/tpch/queries/q4.sql
similarity index 100%
rename from testdata/queries/q4.sql
rename to tpch/queries/q4.sql
diff --git a/testdata/queries/q5.sql b/tpch/queries/q5.sql
similarity index 100%
rename from testdata/queries/q5.sql
rename to tpch/queries/q5.sql
diff --git a/testdata/queries/q6.sql b/tpch/queries/q6.sql
similarity index 100%
rename from testdata/queries/q6.sql
rename to tpch/queries/q6.sql
diff --git a/testdata/queries/q7.sql b/tpch/queries/q7.sql
similarity index 100%
rename from testdata/queries/q7.sql
rename to tpch/queries/q7.sql
diff --git a/testdata/queries/q8.sql b/tpch/queries/q8.sql
similarity index 100%
rename from testdata/queries/q8.sql
rename to tpch/queries/q8.sql
diff --git a/testdata/queries/q9.sql b/tpch/queries/q9.sql
similarity index 100%
rename from testdata/queries/q9.sql
rename to tpch/queries/q9.sql
diff --git a/tpch/requirements.txt b/tpch/requirements.txt
index 2d257db..e995868 100644
--- a/tpch/requirements.txt
+++ b/tpch/requirements.txt
@@ -1,4 +1,3 @@
-# This is a bad idea, we should lock dependencies with poetry and consume this 
tool as an action
-pyarrow
-datafusion
-argparse
+duckdb
+ray[default]
+maturin
diff --git a/tpch/tpc.py b/tpch/tpc.py
deleted file mode 100644
index 8fa5ae8..0000000
--- a/tpch/tpc.py
+++ /dev/null
@@ -1,149 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#
-#
-# This file is useful for running a query against a TPCH dataset.
-#
-# You can run an arbitrary query by passing --query 'select...' or you can run 
a
-# TPCH query by passing --qnum 1-22.
-
-import argparse
-import ray
-from datafusion_ray import RayContext, runtime_env
-import os
-import sys
-import time
-
-try:
-    import duckdb
-except ImportError:
-    print(
-        "duckdb not installed, which is used in this file for retrieving the 
TPCH query"
-    )
-    sys.exit(1)
-
-
-def make_ctx(
-    data_path: str,
-    concurrency: int,
-    batch_size: int,
-    partitions_per_worker: int | None,
-    worker_pool_min: int,
-    listing_tables: bool,
-):
-
-    # Register the tables
-    table_names = [
-        "customer",
-        "lineitem",
-        "nation",
-        "orders",
-        "part",
-        "partsupp",
-        "region",
-        "supplier",
-    ]
-    # Connect to a cluster
-    # use ray job submit
-    ray.init(runtime_env=runtime_env)
-
-    ctx = RayContext(
-        batch_size=batch_size,
-        partitions_per_worker=partitions_per_worker,
-        worker_pool_min=worker_pool_min,
-    )
-
-    ctx.set("datafusion.execution.target_partitions", f"{concurrency}")
-    # ctx.set("datafusion.execution.parquet.pushdown_filters", "true")
-    ctx.set("datafusion.optimizer.enable_round_robin_repartition", "false")
-    ctx.set("datafusion.execution.coalesce_batches", "false")
-
-    for table in table_names:
-        path = os.path.join(data_path, f"{table}.parquet")
-        print(f"Registering table {table} using path {path}")
-        if listing_tables:
-            ctx.register_listing_table(table, f"{path}/")
-        else:
-            ctx.register_parquet(table, path)
-
-    return ctx
-
-
-def main(
-    data_path: str,
-    concurrency: int,
-    batch_size: int,
-    query: str,
-    partitions_per_worker: int | None,
-    worker_pool_min: int,
-    listing_tables,
-) -> None:
-    ctx = make_ctx(
-        data_path,
-        concurrency,
-        batch_size,
-        partitions_per_worker,
-        worker_pool_min,
-        listing_tables,
-    )
-    df = ctx.sql(query)
-    time.sleep(3)
-    df.show()
-
-
-def tpch_query(qnum: int) -> str:
-    query_path = os.path.join(os.path.dirname(__file__), "..", "testdata", 
"queries")
-    return open(os.path.join(query_path, f"q{qnum}.sql")).read()
-
-
-if __name__ == "__main__":
-    parser = argparse.ArgumentParser()
-    parser.add_argument("--data", type=str, help="data path")
-    parser.add_argument("--query", type=str, help="query")
-    parser.add_argument(
-        "--qnum", type=int, default=0, help="query number for TPCH benchmark"
-    )
-    parser.add_argument("--concurrency", type=int, help="concurrency")
-    parser.add_argument("--batch-size", type=int, help="batch size")
-    parser.add_argument(
-        "--partitions-per-worker",
-        type=int,
-        help="Max partitions per Stage Service Worker",
-    )
-    parser.add_argument(
-        "--worker-pool-min",
-        type=int,
-        help="Minimum number of RayStages to keep in pool",
-    )
-    parser.add_argument("--listing-tables", action="store_true")
-    args = parser.parse_args()
-
-    if args.qnum > 0:
-        query = tpch_query(int(args.qnum))
-    else:
-        query = args.query
-
-    main(
-        args.data,
-        int(args.concurrency),
-        int(args.batch_size),
-        query,
-        args.partitions_per_worker,
-        args.worker_pool_min,
-        args.listing_tables,
-    )
diff --git a/tpch/tpcbench.py b/tpch/tpcbench.py
index 85b2937..edb943e 100644
--- a/tpch/tpcbench.py
+++ b/tpch/tpcbench.py
@@ -17,24 +17,16 @@
 
 import argparse
 import ray
-from datafusion import SessionContext, SessionConfig
-from datafusion_ray import RayContext, exec_sql_on_tables, prettify, 
runtime_env
+from datafusion_ray import DFRayContext, df_ray_runtime_env
+from datafusion_ray.util import exec_sql_on_tables, prettify
 from datetime import datetime
 import json
 import os
 import time
 
-try:
-    import duckdb
-except ImportError:
-    print(
-        "duckdb not installed, which is used in this file for retrieving the 
TPCH query"
-    )
-    sys.exit(1)
-
 
 def tpch_query(qnum: int) -> str:
-    query_path = os.path.join(os.path.dirname(__file__), "..", "testdata", 
"queries")
+    query_path = os.path.join(os.path.dirname(__file__), "queries")
     return open(os.path.join(query_path, f"q{qnum}.sql")).read()
 
 
@@ -62,9 +54,9 @@ def main(
     ]
     # Connect to a cluster
     # use ray job submit
-    ray.init(runtime_env=runtime_env)
+    ray.init(runtime_env=df_ray_runtime_env)
 
-    ctx = RayContext(
+    ctx = DFRayContext(
         batch_size=batch_size,
         partitions_per_worker=partitions_per_worker,
         prefetch_buffer_size=prefetch_buffer_size,
@@ -76,8 +68,6 @@ def main(
     ctx.set("datafusion.optimizer.enable_round_robin_repartition", "false")
     ctx.set("datafusion.execution.coalesce_batches", "false")
 
-    local_config = SessionConfig()
-
     for table in table_names:
         path = os.path.join(data_path, f"{table}.parquet")
         print(f"Registering table {table} using path {path}")
@@ -106,8 +96,6 @@ def main(
         results["local_queries"] = {}
         results["validated"] = {}
 
-    duckdb.sql("load tpch")
-
     queries = range(1, 23) if qnum == -1 else [qnum]
     for qnum in queries:
         sql = tpch_query(qnum)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to