This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git


The following commit(s) were added to refs/heads/main by this push:
     new 116734d  Preallocate Ray Workers (#62)
116734d is described below

commit 116734dd4e48a74fbe6a11b936c1b32d2c1c31c8
Author: robtandy <[email protected]>
AuthorDate: Thu Feb 20 11:20:03 2025 -0500

    Preallocate Ray Workers (#62)
    
    * Refactor to preallocate a pool of RayStage Actors
    
    * corrected version of datafusion in requirements-in.txt
---
 datafusion_ray/__init__.py |    2 +-
 datafusion_ray/core.py     |  619 +++++++++++------
 datafusion_ray/friendly.py | 1588 ++++++++++++++++++++++++++++++++++++++++++++
 src/dataframe.rs           |    2 +-
 src/ray_stage_reader.rs    |   14 +-
 src/stage_service.rs       |  220 +++---
 tpch/tpc.py                |   53 +-
 tpch/tpcbench.py           |   16 +-
 8 files changed, 2194 insertions(+), 320 deletions(-)

diff --git a/datafusion_ray/__init__.py b/datafusion_ray/__init__.py
index a920253..d26afa8 100644
--- a/datafusion_ray/__init__.py
+++ b/datafusion_ray/__init__.py
@@ -20,6 +20,6 @@ try:
 except ImportError:
     import importlib_metadata
 
-from .core import RayContext, prettify, runtime_env
+from .core import RayContext, prettify, runtime_env, RayStagePool
 
 __version__ = importlib_metadata.version(__name__)
diff --git a/datafusion_ray/core.py b/datafusion_ray/core.py
index 3d36941..d9c9742 100644
--- a/datafusion_ray/core.py
+++ b/datafusion_ray/core.py
@@ -17,14 +17,16 @@
 
 
 from collections import defaultdict
+from dataclasses import dataclass
 import logging
 import os
 import pyarrow as pa
 import asyncio
 import ray
-import uuid
+import json
 import time
-from typing import Optional
+
+from .friendly import new_friendly_name
 
 from datafusion_ray._datafusion_ray_internal import (
     RayContext as RayContextInternal,
@@ -65,15 +67,11 @@ def call_sync(coro):
     """call a coroutine in the current event loop or run a new one, and 
synchronously
     return the result"""
     try:
-        try:
-            loop = asyncio.get_running_loop()
-        except RuntimeError:
-            return asyncio.run(coro)
-        else:
-            return loop.run_until_complete(coro)
-    except Exception as e:
-        log.error(f"Error in call: {e}")
-        log.exception(e)
+        loop = asyncio.get_running_loop()
+    except RuntimeError:
+        return asyncio.run(coro)
+    else:
+        return loop.run_until_complete(coro)
 
 
 # work around for https://github.com/ray-project/ray/issues/31606
@@ -82,30 +80,376 @@ async def _ensure_coro(maybe_obj_ref):
 
 
 async def wait_for(coros, name=""):
+    """Wait for all coros to complete and return their results.
+    Does not preserve ordering."""
+
     return_values = []
     # wrap the coro in a task to work with python 3.10 and 3.11+ where 
asyncio.wait semantics
     # changed to not accept any awaitable
+    start = time.time()
     done, _ = await asyncio.wait([asyncio.create_task(_ensure_coro(c)) for c 
in coros])
+    end = time.time()
+    log.info(f"waiting for {name} took {end - start}s")
     for d in done:
         e = d.exception()
         if e is not None:
             log.error(f"Exception waiting {name}: {e}")
+            raise e
         else:
             return_values.append(d.result())
     return return_values
 
 
+class RayStagePool:
+    """A pool of RayStage actors that can be acquired and released"""
+
+    # TODO: We can probably manage this set in a better way
+    # This is not a threadsafe implementation.
+    # This is simple though and will suffice for now
+
+    def __init__(self, min_workers: int, max_workers: int):
+        self.min_workers = min_workers
+        self.max_workers = max_workers
+
+        # a map of stage_key (a random identifier) to stage actor reference
+        self.pool = {}
+        # a map of stage_key to listening address
+        self.addrs = {}
+
+        # holds object references from the start_up method for each stage
+        # we know all stages are listening when all of these refs have
+        # been waited on.  When they are ready we remove them from this set
+        self.stages_started = set()
+
+        # an event that is set when all stages are ready to serve
+        self.stages_ready = asyncio.Event()
+
+        # stages that are started but we need to get their address
+        self.need_address = set()
+
+        # stages that we have the address for but need to start serving
+        self.need_serving = set()
+
+        # stages in use
+        self.acquired = set()
+
+        # stages available
+        self.available = set()
+
+        for _ in range(min_workers):
+            self._new_stage()
+
+        log.info(
+            f"created ray stage pool (min_workers: {min_workers}, max_workers: 
{max_workers})"
+        )
+
+    async def start(self):
+        if not self.stages_ready.is_set():
+            await self._wait_for_stages_started()
+            await self._wait_for_get_addrs()
+            await self._wait_for_serve()
+            self.stages_ready.set()
+
+    async def wait_for_ready(self):
+        await self.stages_ready.wait()
+
+    async def acquire(self, need=1):
+        stage_keys = []
+
+        have = len(self.available)
+        total = len(self.available) + len(self.acquired)
+        can_make = self.max_workers - total
+
+        need_to_make = need - have
+
+        if need_to_make > can_make:
+            raise Exception(f"Cannot allocate workers above 
{self.max_workers}")
+
+        if need_to_make > 0:
+            log.debug(f"creating {need_to_make} additional stages")
+            for _ in range(need_to_make):
+                self._new_stage()
+            await wait_for([self.start()], "waiting for created stages")
+
+        assert len(self.available) >= need
+
+        for _ in range(need):
+            stage_key = self.available.pop()
+            self.acquired.add(stage_key)
+
+            stage_keys.append(stage_key)
+
+        stages = [self.pool[sk] for sk in stage_keys]
+        addrs = [self.addrs[sk] for sk in stage_keys]
+        return (stages, stage_keys, addrs)
+
+    def release(self, stage_keys: list[str]):
+        for stage_key in stage_keys:
+            self.acquired.remove(stage_key)
+            self.available.add(stage_key)
+
+    def _new_stage(self):
+        self.stages_ready.clear()
+        stage_key = new_friendly_name()
+        log.debug(f"starting stage: {stage_key}")
+        stage = RayStage.options(name=f"Stage: {stage_key}").remote(stage_key)
+        self.pool[stage_key] = stage
+        self.stages_started.add(stage.start_up.remote())
+        self.available.add(stage_key)
+
+    async def _wait_for_stages_started(self):
+        log.info("waiting for stages to be ready")
+        started_keys = await wait_for(self.stages_started, "stages to be 
started")
+        # we need the addresses of these stages still
+        self.need_address.update(set(started_keys))
+        # we've started all the stages we know about
+        self.stages_started = set()
+        log.info("stages are all listening")
+
+    async def _wait_for_get_addrs(self):
+        # get the addresses in a pipelined fashion
+        refs = []
+        stage_keys = []
+        for stage_key in self.need_address:
+            stage = self.pool[stage_key]
+            refs.append(stage.addr.remote())
+            stage_keys.append(stage_key)
+
+            self.need_serving.add(stage_key)
+
+        addrs = await wait_for(refs, "stage addresses")
+
+        for key, addr in addrs:
+            self.addrs[key] = addr
+
+        self.need_address = set()
+
+    async def _wait_for_serve(self):
+        log.info("running stages")
+        try:
+            for stage_key in self.need_serving:
+                log.info(f"starting serving of stage {stage_key}")
+                stage = self.pool[stage_key]
+                stage.serve.remote()
+            self.need_serving = set()
+
+        except Exception as e:
+            log.error(f"StagePool: Uhandled Exception in serve: {e}")
+            raise e
+
+    async def all_done(self):
+        log.info("calling stage all done")
+        refs = [stage.all_done.remote() for stage in self.pool.values()]
+        await wait_for(refs, "stages to be all done")
+        log.info("all stages shutdown")
+
+
[email protected](num_cpus=0)
+class RayStage:
+    def __init__(self, stage_key):
+        self.stage_key = stage_key
+
+        # import this here so ray doesn't try to serialize the rust extension
+        from datafusion_ray._datafusion_ray_internal import StageService
+
+        self.stage_service = StageService(stage_key)
+
+    async def start_up(self):
+        # this method is sync
+        self.stage_service.start_up()
+        return self.stage_key
+
+    async def all_done(self):
+        await self.stage_service.all_done()
+
+    async def addr(self):
+        return (self.stage_key, self.stage_service.addr())
+
+    async def update_plan(
+        self,
+        stage_id: int,
+        stage_addrs: dict[int, dict[int, list[str]]],
+        partition_group: list[int],
+        plan_bytes: bytes,
+    ):
+        await self.stage_service.update_plan(
+            stage_id,
+            stage_addrs,
+            partition_group,
+            plan_bytes,
+        )
+
+    async def serve(self):
+        log.info(f"[{self.stage_key}] serving on {self.stage_service.addr()}")
+        await self.stage_service.serve()
+        log.info(f"[{self.stage_key}] done serving")
+
+
+@dataclass
+class StageData:
+    stage_id: int
+    plan_bytes: bytes
+    partition_group: list[int]
+    child_stage_ids: list[int]
+    num_output_partitions: int
+    full_partitions: bool
+
+
+@dataclass
+class InternalStageData:
+    stage_id: int
+    plan_bytes: bytes
+    partition_group: list[int]
+    child_stage_ids: list[int]
+    num_output_partitions: int
+    full_partitions: bool
+    remote_stage: ...  # ray.actor.ActorHandle[RayStage]
+    remote_addr: str
+
+    def __str__(self):
+        return f"""Stage: {self.stage_id}, pg: {self.partition_group}, 
child_stages:{self.child_stage_ids}, listening addr:{self.remote_addr}"""
+
+
[email protected](num_cpus=0)
+class RayContextSupervisor:
+    def __init__(
+        self,
+        worker_pool_min: int,
+        worker_pool_max: int,
+    ) -> None:
+        log.info(f"Creating RayContextSupervisor worker_pool_min: 
{worker_pool_min}")
+        self.pool = RayStagePool(worker_pool_min, worker_pool_max)
+        self.stages: dict[str, InternalStageData] = {}
+        log.info("Created RayContextSupervisor")
+
+    async def start(self):
+        await self.pool.start()
+
+    async def wait_for_ready(self):
+        await self.pool.wait_for_ready()
+
+    async def get_stage_addrs(self, stage_id: int):
+        addrs = [
+            sd.remote_addr for sd in self.stages.values() if sd.stage_id == 
stage_id
+        ]
+        return addrs
+
+    async def new_query(
+        self,
+        stage_datas: list[StageData],
+    ):
+        if len(self.stages) > 0:
+            self.pool.release(list(self.stages.keys()))
+
+        remote_stages, remote_stage_keys, remote_addrs = await 
self.pool.acquire(
+            len(stage_datas)
+        )
+        self.stages = {}
+
+        for i, sd in enumerate(stage_datas):
+            remote_stage = remote_stages[i]
+            remote_stage_key = remote_stage_keys[i]
+            remote_addr = remote_addrs[i]
+            self.stages[remote_stage_key] = InternalStageData(
+                sd.stage_id,
+                sd.plan_bytes,
+                sd.partition_group,
+                sd.child_stage_ids,
+                sd.num_output_partitions,
+                sd.full_partitions,
+                remote_stage,
+                remote_addr,
+            )
+
+        # sort out the mess of who talks to whom and ensure we can supply the 
correct
+        # addresses to each of them
+        addrs_by_stage_key = await self.sort_out_addresses()
+        if log.level <= logging.DEBUG:
+            # TODO: string builder here
+            out = ""
+            for stage_key, stage in self.stages.items():
+                out += f"[{stage_key}]: {stage}\n"
+                out += f"child addrs: {addrs_by_stage_key[stage_key]}\n"
+            log.debug(out)
+
+        refs = []
+        # now tell the stages what they are doing for this query
+        for stage_key, isd in self.stages.items():
+            log.info(f"going to update plan for {stage_key}")
+            kid = addrs_by_stage_key[stage_key]
+            refs.append(
+                isd.remote_stage.update_plan.remote(
+                    isd.stage_id,
+                    {stage_id: val["child_addrs"] for (stage_id, val) in 
kid.items()},
+                    isd.partition_group,
+                    isd.plan_bytes,
+                )
+            )
+        log.info("that's all of them")
+
+        await wait_for(refs, "updating plans")
+
+    async def sort_out_addresses(self):
+        """Iterate through our stages and gather all of their listening 
addresses.
+        Then, provide the addresses to of peer stages to each stage.
+        """
+        addrs_by_stage_key = {}
+        for stage_key, isd in self.stages.items():
+            stage_addrs = defaultdict(dict)
+
+            # using "isd" as shorthand to denote InternalStageData as a 
reminder
+
+            for child_stage_id in isd.child_stage_ids:
+                addrs = defaultdict(list)
+                child_stage_keys, child_stage_datas = zip(
+                    *filter(
+                        lambda x: x[1].stage_id == child_stage_id,
+                        self.stages.items(),
+                    )
+                )
+                output_partitions = [
+                    c_isd.num_output_partitions for c_isd in child_stage_datas
+                ]
+
+                # sanity check
+                assert all([op == output_partitions[0] for op in 
output_partitions])
+                output_partitions = output_partitions[0]
+
+                for child_stage_isd in child_stage_datas:
+                    if child_stage_isd.full_partitions:
+                        for partition in range(output_partitions):
+                            # this stage is the definitive place to read this 
output partition
+                            addrs[partition] = [child_stage_isd.remote_addr]
+                    else:
+                        for partition in range(output_partitions):
+                            # this output partition must be gathered from all 
stages with this stage_id
+                            addrs[partition] = [
+                                c.remote_addr for c in child_stage_datas
+                            ]
+
+                stage_addrs[child_stage_id]["child_addrs"] = addrs
+                # not necessary but useful for debug logs
+                stage_addrs[child_stage_id]["stage_keys"] = child_stage_keys
+
+            addrs_by_stage_key[stage_key] = stage_addrs
+
+        return addrs_by_stage_key
+
+    async def all_done(self):
+        await self.pool.all_done()
+
+
 class RayDataFrame:
     def __init__(
         self,
         ray_internal_df: RayDataFrameInternal,
-        query_id: str,
+        supervisor,  # ray.actor.ActorHandle[RayContextSupervisor],
         batch_size=8192,
-        partitions_per_worker: Optional[int] = None,
+        partitions_per_worker: int | None = None,
         prefetch_buffer_size=0,
     ):
         self.df = ray_internal_df
-        self.query_id = query_id
+        self.supervisor = supervisor
         self._stages = None
         self._batches = None
         self.batch_size = batch_size
@@ -119,12 +463,6 @@ class RayDataFrame:
                 self.batch_size, self.prefetch_buffer_size, 
self.partitions_per_worker
             )
 
-            self.coord = RayStageCoordinator.options(
-                name="RayQueryCoordinator:" + self.query_id,
-            ).remote(
-                self.query_id,
-            )
-
         return self._stages
 
     def execution_plan(self):
@@ -143,19 +481,19 @@ class RayDataFrame:
             t2 = time.time()
             log.debug(f"creating stages took {t2 -t1}s")
 
-            last_stage = max([stage.stage_id for stage in self._stages])
-            log.debug(f"last stage is {last_stage}")
+            last_stage_id = max([stage.stage_id for stage in self._stages])
+            log.debug(f"last stage is {last_stage_id}")
 
             self.create_ray_stages()
-            t3 = time.time()
-            log.debug(f"creating ray stage actors took {t3 -t2}s")
-            self.run_stages()
 
-            addrs = ray.get(self.coord.get_stage_addrs.remote())
+            last_stage_addrs = ray.get(
+                self.supervisor.get_stage_addrs.remote(last_stage_id)
+            )
+            log.debug(f"last stage addrs {last_stage_addrs}")
 
-            reader = self.df.read_final_stage(last_stage, 
addrs[last_stage][0][0])
+            reader = self.df.read_final_stage(last_stage_id, 
last_stage_addrs[0])
+            log.debug("got reader")
             self._batches = list(reader)
-            self.coord.all_done.remote()
         return self._batches
 
     def show(self) -> None:
@@ -163,31 +501,27 @@ class RayDataFrame:
         print(prettify(batches))
 
     def create_ray_stages(self):
+        stage_datas = []
 
-        # if we are doing each partition separate (isolate_partitions =True)
-        # then the plan generated will include a PartitionIsolator which
-        # will take care of that.  Our job is to then launch a stage for each
-        # partition.
-        #
-        refs = []
+        # note, whereas the PyDataFrameStage object contained in self.stages()
+        # holds information for a numbered stage,
+        # when we tell the supervisor about our query, it wants a StageData
+        # object per actor that will be created.  Hence the loop over 
partition_groups
         for stage in self.stages():
             for partition_group in stage.partition_groups:
-                refs.append(
-                    self.coord.new_stage.remote(
+                stage_datas.append(
+                    StageData(
                         stage.stage_id,
                         stage.plan_bytes(),
                         partition_group,
+                        stage.child_stage_ids,
                         stage.num_output_partitions,
                         stage.full_partitions,
                     )
                 )
 
-        # wait for all stages to be created
-        # ray.wait(refs, num_returns=len(refs))
-        call_sync(wait_for(refs, "creating ray stages"))
-
-    def run_stages(self):
-        self.coord.serve.remote()
+        ref = self.supervisor.new_query.remote(stage_datas)
+        call_sync(wait_for([ref], "creating ray stages"))
 
 
 class RayContext:
@@ -195,13 +529,32 @@ class RayContext:
         self,
         batch_size: int = 8192,
         prefetch_buffer_size: int = 0,
-        partitions_per_worker: Optional[int] = None,
+        partitions_per_worker: int | None = None,
+        worker_pool_min: int = 1,
+        worker_pool_max: int = 100,
     ) -> None:
         self.ctx = RayContextInternal()
         self.batch_size = batch_size
         self.partitions_per_worker = partitions_per_worker
         self.prefetch_buffer_size = prefetch_buffer_size
 
+        self.supervisor = RayContextSupervisor.options(
+            name="RayContextSupersisor",
+        ).remote(
+            worker_pool_min,
+            worker_pool_max,
+        )
+
+        # start up our super visor and don't check in on it until its
+        # time to query, then we will await this ref
+        start_ref = self.supervisor.start.remote()
+
+        # ensure we are ready
+        s = time.time()
+        call_sync(wait_for([start_ref], "RayContextSupervisor start"))
+        e = time.time()
+        log.info(f"RayContext::__init__ waiting for supervisor to be ready 
took {e-s}s")
+
     def register_parquet(self, name: str, path: str):
         self.ctx.register_parquet(name, path)
 
@@ -209,12 +562,12 @@ class RayContext:
         self.ctx.register_listing_table(name, path, file_extention)
 
     def sql(self, query: str) -> RayDataFrame:
-        query_id = str(uuid.uuid4())
 
         df = self.ctx.sql(query)
+
         return RayDataFrame(
             df,
-            query_id,
+            self.supervisor,
             self.batch_size,
             self.partitions_per_worker,
             self.prefetch_buffer_size,
@@ -223,175 +576,9 @@ class RayContext:
     def set(self, option: str, value: str) -> None:
         self.ctx.set(option, value)
 
+    def __del__(self):
+        log.info("RayContext, cleaning up remote resources")
+        ref = self.supervisor.all_done.remote()
+        call_sync(wait_for([ref], "RayContextSupervisor all done"))
 
[email protected](num_cpus=0)
-class RayStageCoordinator:
-    def __init__(
-        self,
-        query_id: str,
-    ) -> None:
-        self.query_id = query_id
-        self.stages = {}
-        self.stage_addrs = defaultdict(lambda: defaultdict(list))
-        self.output_partitions = {}
-        self.stages_started = []
-        self.stages_ready = asyncio.Event()
-
-    async def all_done(self):
-        log.debug("calling stage all done")
-        refs = [stage.all_done.remote() for stage in self.stages.values()]
-        # ray.wait(refs, num_returns=len(refs))
-        await wait_for(refs, "stages to be all done")
-        log.debug("done stage all done")
-
-    async def new_stage(
-        self,
-        stage_id: int,
-        plan_bytes: bytes,
-        partition_group: list[int],
-        num_output_partitions: int,
-        full_partitions: bool,
-    ):
-
-        try:
-            if stage_id in self.output_partitions:
-                assert self.output_partitions[stage_id] == 
num_output_partitions
-            else:
-                self.output_partitions[stage_id] = num_output_partitions
-
-            # we need a tuple so its hashable
-            partition_set = tuple(partition_group)
-            stage_key = (stage_id, partition_set, full_partitions)
-
-            log.debug(f"creating new stage {stage_key} from bytes 
{len(plan_bytes)}")
-            stage = RayStage.options(
-                name=f"Stage: {stage_key}, query_id:{self.query_id}",
-            ).remote(stage_id, plan_bytes, partition_group)
-            self.stages[stage_key] = stage
-            self.stages_started.append(stage.start_up.remote())
-
-        except Exception as e:
-            log.error(
-                f"RayQueryCoordinator[{self.query_id}] Unhandled Exception in 
new stage! {e}"
-            )
-            raise e
-
-    async def wait_for_stages_ready(self):
-        log.debug("waiting for stages to be ready")
-        await self.stages_ready.wait()
-
-    async def ensure_stages_ready(self):
-        # ray.wait(self.stages_started, num_returns=len(self.stages_started))
-        log.debug(f"going to wait for {self.stages_started}")
-        await wait_for(self.stages_started, "stages to be started")
-        await self.sort_out_addresses()
-        log.info("all stages ready")
-        self.stages_ready.set()
-
-    async def get_stage_addrs(self) -> dict[int, list[str]]:
-        log.debug("Checking to ensure stages are ready before returning addrs")
-        await self.wait_for_stages_ready()
-        log.debug("Looks like they are ready")
-        return self.stage_addrs
-
-    async def sort_out_addresses(self):
-        """Iterate through our stages and gather all of their listening 
addresses.
-        Then, provide the addresses to of peer stages to each stage.
-        """
-
-        # first go get all addresses from the stages we launched, concurrently
-        # pipeline this by firing up all tasks before awaiting any results
-        addrs_by_stage = defaultdict(list)
-        addrs_by_stage_partition = defaultdict(dict)
-        for stage_key, stage in self.stages.items():
-            stage_id, partition_set, full_partitions = stage_key
-            a_future = stage.addr.remote()
-            addrs_by_stage[stage_id].append(a_future)
-            for partition in partition_set:
-                addrs_by_stage_partition[stage_id][partition] = a_future
-
-        for stage_key, stage in self.stages.items():
-            stage_id, partition_set, full_partitions = stage_key
-            if full_partitions:
-                for partition in range(self.output_partitions[stage_id]):
-                    self.stage_addrs[stage_id][partition] = await wait_for(
-                        [addrs_by_stage_partition[stage_id][partition]]
-                    )
-            else:
-                for partition in range(self.output_partitions[stage_id]):
-                    self.stage_addrs[stage_id][partition] = await wait_for(
-                        addrs_by_stage[stage_id]
-                    )
-
-        if log.level <= logging.DEBUG:
-            out = ""
-            for stage_id, partition_addrs in self.stage_addrs.items():
-                out += f"Stage {stage_id}: \n"
-                for partition, addrs in partition_addrs.items():
-                    out += f"  partition {partition}: {addrs}\n"
-            log.debug(f"stage_addrs:\n{out}")
-        # now update all the stages with the addresses of peers such
-        # that they can contact their child stages
-        refs = []
-        for stage_key, stage in self.stages.items():
-            refs.append(stage.set_stage_addrs.remote(self.stage_addrs))
-
-        # ray.wait(refs, num_returns=len(refs))
-        await wait_for(refs, "stages to to have addrs set")
-        log.debug("all stage addrs set? or should be")
-
-    async def serve(self):
-        await self.ensure_stages_ready()
-        log.info("running stages")
-        try:
-            for stage_key, stage in self.stages.items():
-                log.info(f"starting serving of stage {stage_key}")
-                stage.serve.remote()
-
-        except Exception as e:
-            log.error(
-                f"RayQueryCoordinator[{self.query_id}] Unhandled Exception in 
run stages! {e}"
-            )
-            raise e
-
-
[email protected](num_cpus=0)
-class RayStage:
-    def __init__(
-        self,
-        stage_id: int,
-        plan_bytes: bytes,
-        partition_group: list[int],
-    ):
-
-        from datafusion_ray._datafusion_ray_internal import StageService
-
-        try:
-            self.stage_id = stage_id
-            self.stage_service = StageService(
-                stage_id,
-                plan_bytes,
-                partition_group,
-            )
-        except Exception as e:
-            log.error(
-                f"StageService[{self.stage_id}{partition_group}] Unhandled 
Exception in init: {e}!"
-            )
-            raise
-
-    async def start_up(self):
-        # this method is sync
-        self.stage_service.start_up()
-
-    async def all_done(self):
-        await self.stage_service.all_done()
-
-    async def addr(self):
-        return self.stage_service.addr()
-
-    async def set_stage_addrs(self, stage_addrs: dict[int, list[str]]):
-        await self.stage_service.set_stage_addrs(stage_addrs)
-
-    async def serve(self):
-        await self.stage_service.serve()
-        log.info("StageService done serving")
+    # log.debug("all stage addrs set? or should be")
diff --git a/datafusion_ray/friendly.py b/datafusion_ray/friendly.py
new file mode 100644
index 0000000..49d31e7
--- /dev/null
+++ b/datafusion_ray/friendly.py
@@ -0,0 +1,1588 @@
+# Adapted from http://en.wikipedia.org/wiki/List_of_animal_names
+# and https://github.com/bryanmylee/zoo-ids
+
+import random
+
+
+def new_friendly_name():
+    name = random.choice(animals)
+    adj = adjectives.pop()
+    return f"{adj}-{name}"
+
+
+# In total, 221 animals
+animals = [
+    "aardvark",
+    "albatross",
+    "alligator",
+    "alpaca",
+    "ant",
+    "anteater",
+    "antelope",
+    "ape",
+    "armadillo",
+    "donkey",
+    "baboon",
+    "badger",
+    "barracuda",
+    "bat",
+    "bear",
+    "beaver",
+    "bee",
+    "bison",
+    "boar",
+    "buffalo",
+    "butterfly",
+    "camel",
+    "capybara",
+    "caribou",
+    "cassowary",
+    "cat",
+    "caterpillar",
+    "cattle",
+    "chamois",
+    "cheetah",
+    "chicken",
+    "chimpanzee",
+    "chinchilla",
+    "chough",
+    "clam",
+    "cobra",
+    "cockroach",
+    "cod",
+    "cormorant",
+    "coyote",
+    "crab",
+    "crane",
+    "crocodile",
+    "crow",
+    "curlew",
+    "deer",
+    "dinosaur",
+    "dog",
+    "dogfish",
+    "dolphin",
+    "dotterel",
+    "dove",
+    "dragonfly",
+    "duck",
+    "dugong",
+    "dunlin",
+    "eagle",
+    "echidna",
+    "eel",
+    "eland",
+    "elephant",
+    "elk",
+    "emu",
+    "falcon",
+    "ferret",
+    "finch",
+    "fish",
+    "flamingo",
+    "fly",
+    "fox",
+    "frog",
+    "gaur",
+    "gazelle",
+    "gerbil",
+    "giraffe",
+    "gnat",
+    "gnu",
+    "goat",
+    "goldfinch",
+    "goldfish",
+    "goose",
+    "gorilla",
+    "goshawk",
+    "grasshopper",
+    "grouse",
+    "guanaco",
+    "gull",
+    "hamster",
+    "hare",
+    "hawk",
+    "hedgehog",
+    "heron",
+    "herring",
+    "hippopotamus",
+    "hornet",
+    "horse",
+    "human",
+    "hummingbird",
+    "hyena",
+    "ibex",
+    "ibis",
+    "jackal",
+    "jaguar",
+    "jay",
+    "jellyfish",
+    "kangaroo",
+    "kingfisher",
+    "koala",
+    "kookabura",
+    "kouprey",
+    "kudu",
+    "lapwing",
+    "lark",
+    "lemur",
+    "leopard",
+    "lion",
+    "llama",
+    "lobster",
+    "locust",
+    "loris",
+    "louse",
+    "lyrebird",
+    "magpie",
+    "mallard",
+    "manatee",
+    "mandrill",
+    "mantis",
+    "marten",
+    "meerkat",
+    "mink",
+    "mole",
+    "mongoose",
+    "monkey",
+    "moose",
+    "mosquito",
+    "mouse",
+    "mule",
+    "narwhal",
+    "newt",
+    "nightingale",
+    "octopus",
+    "okapi",
+    "opossum",
+    "oryx",
+    "ostrich",
+    "otter",
+    "owl",
+    "oyster",
+    "panther",
+    "parrot",
+    "partridge",
+    "peafowl",
+    "pelican",
+    "penguin",
+    "pheasant",
+    "pig",
+    "pigeon",
+    "pony",
+    "porcupine",
+    "porpoise",
+    "quail",
+    "quelea",
+    "quetzal",
+    "rabbit",
+    "raccoon",
+    "rail",
+    "ram",
+    "rat",
+    "raven",
+    "reindeer",
+    "rhinoceros",
+    "rook",
+    "salamander",
+    "salmon",
+    "sandpiper",
+    "sardine",
+    "scorpion",
+    "seahorse",
+    "seal",
+    "shark",
+    "sheep",
+    "shrew",
+    "skunk",
+    "snail",
+    "snake",
+    "sparrow",
+    "spider",
+    "spoonbill",
+    "squid",
+    "squirrel",
+    "starling",
+    "stingray",
+    "stinkbug",
+    "stork",
+    "swallow",
+    "swan",
+    "tapir",
+    "tarsier",
+    "termite",
+    "tiger",
+    "toad",
+    "trout",
+    "turkey",
+    "turtle",
+    "viper",
+    "vulture",
+    "wallaby",
+    "walrus",
+    "wasp",
+    "weasel",
+    "whale",
+    "wildcat",
+    "wolf",
+    "wolverine",
+    "wombat",
+    "woodcock",
+    "woodpecker",
+    "worm",
+    "wren",
+    "yak",
+    "zebra",
+]
+
+adjectives = set(
+    [
+        "abandoned",
+        "able",
+        "absolute",
+        "adorable",
+        "adventurous",
+        "academic",
+        "acceptable",
+        "acclaimed",
+        "accomplished",
+        "accurate",
+        "aching",
+        "acidic",
+        "acrobatic",
+        "active",
+        "actual",
+        "adept",
+        "admirable",
+        "admired",
+        "adolescent",
+        "adorable",
+        "adored",
+        "advanced",
+        "afraid",
+        "affectionate",
+        "aged",
+        "aggravating",
+        "aggressive",
+        "agile",
+        "agitated",
+        "agonizing",
+        "agreeable",
+        "ajar",
+        "alarmed",
+        "alarming",
+        "alert",
+        "alienated",
+        "alive",
+        "all",
+        "altruistic",
+        "amazing",
+        "ambitious",
+        "ample",
+        "amused",
+        "amusing",
+        "anchored",
+        "ancient",
+        "angelic",
+        "angry",
+        "anguished",
+        "animated",
+        "annual",
+        "another",
+        "antique",
+        "anxious",
+        "any",
+        "apprehensive",
+        "appropriate",
+        "apt",
+        "arctic",
+        "arid",
+        "aromatic",
+        "artistic",
+        "ashamed",
+        "assured",
+        "astonishing",
+        "athletic",
+        "attached",
+        "attentive",
+        "attractive",
+        "austere",
+        "authentic",
+        "authorized",
+        "automatic",
+        "avaricious",
+        "average",
+        "aware",
+        "awesome",
+        "awful",
+        "awkward",
+        "babyish",
+        "bad",
+        "back",
+        "baggy",
+        "bare",
+        "barren",
+        "basic",
+        "beautiful",
+        "belated",
+        "beloved",
+        "beneficial",
+        "better",
+        "best",
+        "bewitched",
+        "big",
+        "big-hearted",
+        "biodegradable",
+        "bite-sized",
+        "bitter",
+        "black",
+        "black-and-white",
+        "bland",
+        "blank",
+        "blaring",
+        "bleak",
+        "blind",
+        "blissful",
+        "blond",
+        "blue",
+        "blushing",
+        "bogus",
+        "boiling",
+        "bold",
+        "bony",
+        "boring",
+        "bossy",
+        "both",
+        "bouncy",
+        "bountiful",
+        "bowed",
+        "brave",
+        "breakable",
+        "brief",
+        "bright",
+        "brilliant",
+        "brisk",
+        "broken",
+        "bronze",
+        "brown",
+        "bruised",
+        "bubbly",
+        "bulky",
+        "bumpy",
+        "buoyant",
+        "burdensome",
+        "burly",
+        "bustling",
+        "busy",
+        "buttery",
+        "buzzing",
+        "calculating",
+        "calm",
+        "candid",
+        "canine",
+        "capital",
+        "carefree",
+        "careful",
+        "careless",
+        "caring",
+        "cautious",
+        "cavernous",
+        "celebrated",
+        "charming",
+        "cheap",
+        "cheerful",
+        "cheery",
+        "chief",
+        "chilly",
+        "chubby",
+        "circular",
+        "classic",
+        "clean",
+        "clear",
+        "clear-cut",
+        "clever",
+        "close",
+        "closed",
+        "cloudy",
+        "clueless",
+        "clumsy",
+        "cluttered",
+        "coarse",
+        "cold",
+        "colorful",
+        "colorless",
+        "colossal",
+        "comfortable",
+        "common",
+        "compassionate",
+        "competent",
+        "complete",
+        "complex",
+        "complicated",
+        "composed",
+        "concerned",
+        "concrete",
+        "confused",
+        "conscious",
+        "considerate",
+        "constant",
+        "content",
+        "conventional",
+        "cooked",
+        "cool",
+        "cooperative",
+        "coordinated",
+        "corny",
+        "corrupt",
+        "costly",
+        "courageous",
+        "courteous",
+        "crafty",
+        "crazy",
+        "creamy",
+        "creative",
+        "creepy",
+        "criminal",
+        "crisp",
+        "critical",
+        "crooked",
+        "crowded",
+        "cruel",
+        "crushing",
+        "cuddly",
+        "cultivated",
+        "cultured",
+        "cumbersome",
+        "curly",
+        "curvy",
+        "cute",
+        "cylindrical",
+        "damaged",
+        "damp",
+        "dangerous",
+        "dapper",
+        "daring",
+        "darling",
+        "dark",
+        "dazzling",
+        "dead",
+        "deadly",
+        "deafening",
+        "dear",
+        "dearest",
+        "decent",
+        "decimal",
+        "decisive",
+        "deep",
+        "defenseless",
+        "defensive",
+        "defiant",
+        "deficient",
+        "definite",
+        "definitive",
+        "delayed",
+        "delectable",
+        "delicious",
+        "delightful",
+        "delirious",
+        "demanding",
+        "dense",
+        "dental",
+        "dependable",
+        "dependent",
+        "descriptive",
+        "deserted",
+        "detailed",
+        "determined",
+        "devoted",
+        "different",
+        "difficult",
+        "digital",
+        "diligent",
+        "dim",
+        "dimpled",
+        "dimwitted",
+        "direct",
+        "disastrous",
+        "discrete",
+        "disfigured",
+        "disgusting",
+        "disloyal",
+        "dismal",
+        "distant",
+        "downright",
+        "dreary",
+        "dirty",
+        "disguised",
+        "dishonest",
+        "dismal",
+        "distant",
+        "distinct",
+        "distorted",
+        "dizzy",
+        "dopey",
+        "doting",
+        "double",
+        "downright",
+        "drab",
+        "drafty",
+        "dramatic",
+        "dreary",
+        "droopy",
+        "dry",
+        "dual",
+        "dull",
+        "dutiful",
+        "each",
+        "eager",
+        "earnest",
+        "early",
+        "easy",
+        "easy-going",
+        "ecstatic",
+        "edible",
+        "educated",
+        "elaborate",
+        "elastic",
+        "elated",
+        "elderly",
+        "electric",
+        "elegant",
+        "elementary",
+        "elliptical",
+        "embarrassed",
+        "embellished",
+        "eminent",
+        "emotional",
+        "empty",
+        "enchanted",
+        "enchanting",
+        "energetic",
+        "enlightened",
+        "enormous",
+        "enraged",
+        "entire",
+        "envious",
+        "equal",
+        "equatorial",
+        "essential",
+        "esteemed",
+        "ethical",
+        "euphoric",
+        "even",
+        "evergreen",
+        "everlasting",
+        "every",
+        "evil",
+        "exalted",
+        "excellent",
+        "exemplary",
+        "exhausted",
+        "excitable",
+        "excited",
+        "exciting",
+        "exotic",
+        "expensive",
+        "experienced",
+        "expert",
+        "extraneous",
+        "extroverted",
+        "extra-large",
+        "extra-small",
+        "fabulous",
+        "failing",
+        "faint",
+        "fair",
+        "faithful",
+        "fake",
+        "false",
+        "familiar",
+        "famous",
+        "fancy",
+        "fantastic",
+        "far",
+        "faraway",
+        "far-flung",
+        "far-off",
+        "fast",
+        "fat",
+        "fatal",
+        "fatherly",
+        "favorable",
+        "favorite",
+        "fearful",
+        "fearless",
+        "feisty",
+        "feline",
+        "female",
+        "feminine",
+        "few",
+        "fickle",
+        "filthy",
+        "fine",
+        "finished",
+        "firm",
+        "first",
+        "firsthand",
+        "fitting",
+        "fixed",
+        "flaky",
+        "flamboyant",
+        "flashy",
+        "flat",
+        "flawed",
+        "flawless",
+        "flickering",
+        "flimsy",
+        "flippant",
+        "flowery",
+        "fluffy",
+        "fluid",
+        "flustered",
+        "focused",
+        "fond",
+        "foolhardy",
+        "foolish",
+        "forceful",
+        "forked",
+        "formal",
+        "forsaken",
+        "forthright",
+        "fortunate",
+        "fragrant",
+        "frail",
+        "frank",
+        "frayed",
+        "free",
+        "French",
+        "fresh",
+        "frequent",
+        "friendly",
+        "frightened",
+        "frightening",
+        "frigid",
+        "frilly",
+        "frizzy",
+        "frivolous",
+        "front",
+        "frosty",
+        "frozen",
+        "frugal",
+        "fruitful",
+        "full",
+        "fumbling",
+        "functional",
+        "funny",
+        "fussy",
+        "fuzzy",
+        "gargantuan",
+        "gaseous",
+        "general",
+        "generous",
+        "gentle",
+        "genuine",
+        "giant",
+        "giddy",
+        "gigantic",
+        "gifted",
+        "giving",
+        "glamorous",
+        "glaring",
+        "glass",
+        "gleaming",
+        "gleeful",
+        "glistening",
+        "glittering",
+        "gloomy",
+        "glorious",
+        "glossy",
+        "glum",
+        "golden",
+        "good",
+        "good-natured",
+        "gorgeous",
+        "graceful",
+        "gracious",
+        "grand",
+        "grandiose",
+        "granular",
+        "grateful",
+        "grave",
+        "gray",
+        "great",
+        "greedy",
+        "green",
+        "gregarious",
+        "grim",
+        "grimy",
+        "gripping",
+        "grizzled",
+        "gross",
+        "grotesque",
+        "grouchy",
+        "grounded",
+        "growing",
+        "growling",
+        "grown",
+        "grubby",
+        "gruesome",
+        "grumpy",
+        "guilty",
+        "gullible",
+        "gummy",
+        "hairy",
+        "half",
+        "handmade",
+        "handsome",
+        "handy",
+        "happy",
+        "happy-go-lucky",
+        "hard",
+        "hard-to-find",
+        "harmful",
+        "harmless",
+        "harmonious",
+        "harsh",
+        "hasty",
+        "hateful",
+        "haunting",
+        "healthy",
+        "heartfelt",
+        "hearty",
+        "heavenly",
+        "heavy",
+        "hefty",
+        "helpful",
+        "helpless",
+        "hidden",
+        "hideous",
+        "high",
+        "high-level",
+        "hilarious",
+        "hoarse",
+        "hollow",
+        "homely",
+        "honest",
+        "honorable",
+        "honored",
+        "hopeful",
+        "horrible",
+        "hospitable",
+        "hot",
+        "huge",
+        "humble",
+        "humiliating",
+        "humming",
+        "humongous",
+        "hungry",
+        "hurtful",
+        "husky",
+        "icky",
+        "icy",
+        "ideal",
+        "idealistic",
+        "identical",
+        "idle",
+        "idiotic",
+        "idolized",
+        "ignorant",
+        "ill",
+        "illegal",
+        "ill-fated",
+        "ill-informed",
+        "illiterate",
+        "illustrious",
+        "imaginary",
+        "imaginative",
+        "immaculate",
+        "immaterial",
+        "immediate",
+        "immense",
+        "impassioned",
+        "impeccable",
+        "impartial",
+        "imperfect",
+        "imperturbable",
+        "impish",
+        "impolite",
+        "important",
+        "impossible",
+        "impractical",
+        "impressionable",
+        "impressive",
+        "improbable",
+        "impure",
+        "inborn",
+        "incomparable",
+        "incompatible",
+        "incomplete",
+        "inconsequential",
+        "incredible",
+        "indelible",
+        "inexperienced",
+        "indolent",
+        "infamous",
+        "infantile",
+        "infatuated",
+        "inferior",
+        "infinite",
+        "informal",
+        "innocent",
+        "insecure",
+        "insidious",
+        "insignificant",
+        "insistent",
+        "instructive",
+        "insubstantial",
+        "intelligent",
+        "intent",
+        "intentional",
+        "interesting",
+        "internal",
+        "international",
+        "intrepid",
+        "ironclad",
+        "irresponsible",
+        "irritating",
+        "itchy",
+        "jaded",
+        "jagged",
+        "jam-packed",
+        "jaunty",
+        "jealous",
+        "jittery",
+        "joint",
+        "jolly",
+        "jovial",
+        "joyful",
+        "joyous",
+        "jubilant",
+        "judicious",
+        "juicy",
+        "jumbo",
+        "junior",
+        "jumpy",
+        "juvenile",
+        "kaleidoscopic",
+        "keen",
+        "key",
+        "kind",
+        "kindhearted",
+        "kindly",
+        "klutzy",
+        "knobby",
+        "knotty",
+        "knowledgeable",
+        "knowing",
+        "known",
+        "kooky",
+        "kosher",
+        "lame",
+        "lanky",
+        "large",
+        "last",
+        "lasting",
+        "late",
+        "lavish",
+        "lawful",
+        "lazy",
+        "leading",
+        "lean",
+        "leafy",
+        "left",
+        "legal",
+        "legitimate",
+        "light",
+        "lighthearted",
+        "likable",
+        "likely",
+        "limited",
+        "limp",
+        "limping",
+        "linear",
+        "lined",
+        "liquid",
+        "little",
+        "live",
+        "lively",
+        "livid",
+        "loathsome",
+        "lone",
+        "lonely",
+        "long",
+        "long-term",
+        "loose",
+        "lopsided",
+        "lost",
+        "loud",
+        "lovable",
+        "lovely",
+        "loving",
+        "low",
+        "loyal",
+        "lucky",
+        "lumbering",
+        "luminous",
+        "lumpy",
+        "lustrous",
+        "luxurious",
+        "mad",
+        "made-up",
+        "magnificent",
+        "majestic",
+        "major",
+        "male",
+        "mammoth",
+        "married",
+        "marvelous",
+        "masculine",
+        "massive",
+        "mature",
+        "meager",
+        "mealy",
+        "mean",
+        "measly",
+        "meaty",
+        "medical",
+        "mediocre",
+        "medium",
+        "meek",
+        "mellow",
+        "melodic",
+        "memorable",
+        "menacing",
+        "merry",
+        "messy",
+        "metallic",
+        "mild",
+        "milky",
+        "mindless",
+        "miniature",
+        "minor",
+        "minty",
+        "miserable",
+        "miserly",
+        "misguided",
+        "misty",
+        "mixed",
+        "modern",
+        "modest",
+        "moist",
+        "monstrous",
+        "monthly",
+        "monumental",
+        "moral",
+        "mortified",
+        "motherly",
+        "motionless",
+        "mountainous",
+        "muddy",
+        "muffled",
+        "multicolored",
+        "mundane",
+        "murky",
+        "mushy",
+        "musty",
+        "muted",
+        "mysterious",
+        "naive",
+        "narrow",
+        "nasty",
+        "natural",
+        "naughty",
+        "nautical",
+        "near",
+        "neat",
+        "necessary",
+        "needy",
+        "negative",
+        "neglected",
+        "negligible",
+        "neighboring",
+        "nervous",
+        "new",
+        "next",
+        "nice",
+        "nifty",
+        "nimble",
+        "nippy",
+        "nocturnal",
+        "noisy",
+        "nonstop",
+        "normal",
+        "notable",
+        "noted",
+        "noteworthy",
+        "novel",
+        "noxious",
+        "numb",
+        "nutritious",
+        "nutty",
+        "obedient",
+        "obese",
+        "oblong",
+        "oily",
+        "oblong",
+        "obvious",
+        "occasional",
+        "odd",
+        "oddball",
+        "offbeat",
+        "offensive",
+        "official",
+        "old",
+        "old-fashioned",
+        "only",
+        "open",
+        "optimal",
+        "optimistic",
+        "opulent",
+        "orange",
+        "orderly",
+        "organic",
+        "ornate",
+        "ornery",
+        "ordinary",
+        "original",
+        "other",
+        "our",
+        "outlying",
+        "outgoing",
+        "outlandish",
+        "outrageous",
+        "outstanding",
+        "oval",
+        "overcooked",
+        "overdue",
+        "overjoyed",
+        "overlooked",
+        "palatable",
+        "pale",
+        "paltry",
+        "parallel",
+        "parched",
+        "partial",
+        "passionate",
+        "past",
+        "pastel",
+        "peaceful",
+        "peppery",
+        "perfect",
+        "perfumed",
+        "periodic",
+        "perky",
+        "personal",
+        "pertinent",
+        "pesky",
+        "pessimistic",
+        "petty",
+        "phony",
+        "physical",
+        "piercing",
+        "pink",
+        "pitiful",
+        "plain",
+        "plaintive",
+        "plastic",
+        "playful",
+        "pleasant",
+        "pleased",
+        "pleasing",
+        "plump",
+        "plush",
+        "polished",
+        "polite",
+        "political",
+        "pointed",
+        "pointless",
+        "poised",
+        "poor",
+        "popular",
+        "portly",
+        "posh",
+        "positive",
+        "possible",
+        "potable",
+        "powerful",
+        "powerless",
+        "practical",
+        "precious",
+        "present",
+        "prestigious",
+        "pretty",
+        "precious",
+        "previous",
+        "pricey",
+        "prickly",
+        "primary",
+        "prime",
+        "pristine",
+        "private",
+        "prize",
+        "probable",
+        "productive",
+        "profitable",
+        "profuse",
+        "proper",
+        "proud",
+        "prudent",
+        "punctual",
+        "pungent",
+        "puny",
+        "pure",
+        "purple",
+        "pushy",
+        "putrid",
+        "puzzled",
+        "puzzling",
+        "quaint",
+        "qualified",
+        "quarrelsome",
+        "quarterly",
+        "queasy",
+        "querulous",
+        "questionable",
+        "quick",
+        "quick-witted",
+        "quiet",
+        "quintessential",
+        "quirky",
+        "quixotic",
+        "quizzical",
+        "radiant",
+        "ragged",
+        "rapid",
+        "rare",
+        "rash",
+        "raw",
+        "recent",
+        "reckless",
+        "rectangular",
+        "ready",
+        "real",
+        "realistic",
+        "reasonable",
+        "red",
+        "reflecting",
+        "regal",
+        "regular",
+        "reliable",
+        "relieved",
+        "remarkable",
+        "remorseful",
+        "remote",
+        "repentant",
+        "required",
+        "respectful",
+        "responsible",
+        "repulsive",
+        "revolving",
+        "rewarding",
+        "rich",
+        "rigid",
+        "right",
+        "ringed",
+        "ripe",
+        "roasted",
+        "robust",
+        "rosy",
+        "rotating",
+        "rotten",
+        "rough",
+        "round",
+        "rowdy",
+        "royal",
+        "rubbery",
+        "rundown",
+        "ruddy",
+        "rude",
+        "runny",
+        "rural",
+        "rusty",
+        "sad",
+        "safe",
+        "salty",
+        "same",
+        "sandy",
+        "sane",
+        "sarcastic",
+        "sardonic",
+        "satisfied",
+        "scaly",
+        "scarce",
+        "scared",
+        "scary",
+        "scented",
+        "scholarly",
+        "scientific",
+        "scornful",
+        "scratchy",
+        "scrawny",
+        "second",
+        "secondary",
+        "second-hand",
+        "secret",
+        "self-assured",
+        "self-reliant",
+        "selfish",
+        "sentimental",
+        "separate",
+        "serene",
+        "serious",
+        "serpentine",
+        "several",
+        "severe",
+        "shabby",
+        "shadowy",
+        "shady",
+        "shallow",
+        "shameful",
+        "shameless",
+        "sharp",
+        "shimmering",
+        "shiny",
+        "shocked",
+        "shocking",
+        "shoddy",
+        "short",
+        "short-term",
+        "showy",
+        "shrill",
+        "shy",
+        "sick",
+        "silent",
+        "silky",
+        "silly",
+        "silver",
+        "similar",
+        "simple",
+        "simplistic",
+        "sinful",
+        "single",
+        "sizzling",
+        "skeletal",
+        "skinny",
+        "sleepy",
+        "slight",
+        "slim",
+        "slimy",
+        "slippery",
+        "slow",
+        "slushy",
+        "small",
+        "smart",
+        "smoggy",
+        "smooth",
+        "smug",
+        "snappy",
+        "snarling",
+        "sneaky",
+        "sniveling",
+        "snoopy",
+        "sociable",
+        "soft",
+        "soggy",
+        "solid",
+        "somber",
+        "some",
+        "spherical",
+        "sophisticated",
+        "sore",
+        "sorrowful",
+        "soulful",
+        "soupy",
+        "sour",
+        "Spanish",
+        "sparkling",
+        "sparse",
+        "specific",
+        "spectacular",
+        "speedy",
+        "spicy",
+        "spiffy",
+        "spirited",
+        "spiteful",
+        "splendid",
+        "spotless",
+        "spotted",
+        "spry",
+        "square",
+        "squeaky",
+        "squiggly",
+        "stable",
+        "staid",
+        "stained",
+        "stale",
+        "standard",
+        "starchy",
+        "stark",
+        "starry",
+        "steep",
+        "sticky",
+        "stiff",
+        "stimulating",
+        "stingy",
+        "stormy",
+        "straight",
+        "strange",
+        "steel",
+        "strict",
+        "strident",
+        "striking",
+        "striped",
+        "strong",
+        "studious",
+        "stunning",
+        "stupendous",
+        "stupid",
+        "sturdy",
+        "stylish",
+        "subdued",
+        "submissive",
+        "substantial",
+        "subtle",
+        "suburban",
+        "sudden",
+        "sugary",
+        "sunny",
+        "super",
+        "superb",
+        "superficial",
+        "superior",
+        "supportive",
+        "sure-footed",
+        "surprised",
+        "suspicious",
+        "svelte",
+        "sweaty",
+        "sweet",
+        "sweltering",
+        "swift",
+        "sympathetic",
+        "tall",
+        "talkative",
+        "tame",
+        "tan",
+        "tangible",
+        "tart",
+        "tasty",
+        "tattered",
+        "taut",
+        "tedious",
+        "teeming",
+        "tempting",
+        "tender",
+        "tense",
+        "tepid",
+        "terrible",
+        "terrific",
+        "testy",
+        "thankful",
+        "that",
+        "these",
+        "thick",
+        "thin",
+        "third",
+        "thirsty",
+        "this",
+        "thorough",
+        "thorny",
+        "those",
+        "thoughtful",
+        "threadbare",
+        "thrifty",
+        "thunderous",
+        "tidy",
+        "tight",
+        "timely",
+        "tinted",
+        "tiny",
+        "tired",
+        "torn",
+        "total",
+        "tough",
+        "traumatic",
+        "treasured",
+        "tremendous",
+        "tragic",
+        "trained",
+        "tremendous",
+        "triangular",
+        "tricky",
+        "trifling",
+        "trim",
+        "trivial",
+        "troubled",
+        "true",
+        "trusting",
+        "trustworthy",
+        "trusty",
+        "truthful",
+        "tubby",
+        "turbulent",
+        "twin",
+        "ugly",
+        "ultimate",
+        "unacceptable",
+        "unaware",
+        "uncomfortable",
+        "uncommon",
+        "unconscious",
+        "understated",
+        "unequaled",
+        "uneven",
+        "unfinished",
+        "unfit",
+        "unfolded",
+        "unfortunate",
+        "unhappy",
+        "unhealthy",
+        "uniform",
+        "unimportant",
+        "unique",
+        "united",
+        "unkempt",
+        "unknown",
+        "unlawful",
+        "unlined",
+        "unlucky",
+        "unnatural",
+        "unpleasant",
+        "unrealistic",
+        "unripe",
+        "unruly",
+        "unselfish",
+        "unsightly",
+        "unsteady",
+        "unsung",
+        "untidy",
+        "untimely",
+        "untried",
+        "untrue",
+        "unused",
+        "unusual",
+        "unwelcome",
+        "unwieldy",
+        "unwilling",
+        "unwitting",
+        "unwritten",
+        "upbeat",
+        "upright",
+        "upset",
+        "urban",
+        "usable",
+        "used",
+        "useful",
+        "useless",
+        "utilized",
+        "utter",
+        "vacant",
+        "vague",
+        "vain",
+        "valid",
+        "valuable",
+        "vapid",
+        "variable",
+        "vast",
+        "velvety",
+        "venerated",
+        "vengeful",
+        "verifiable",
+        "vibrant",
+        "vicious",
+        "victorious",
+        "vigilant",
+        "vigorous",
+        "villainous",
+        "violet",
+        "violent",
+        "virtual",
+        "virtuous",
+        "visible",
+        "vital",
+        "vivacious",
+        "vivid",
+        "voluminous",
+        "wan",
+        "warlike",
+        "warm",
+        "warmhearted",
+        "warped",
+        "wary",
+        "wasteful",
+        "watchful",
+        "waterlogged",
+        "watery",
+        "wavy",
+        "wealthy",
+        "weak",
+        "weary",
+        "webbed",
+        "wee",
+        "weekly",
+        "weepy",
+        "weighty",
+        "weird",
+        "welcome",
+        "well-documented",
+        "well-groomed",
+        "well-informed",
+        "well-lit",
+        "well-made",
+        "well-off",
+        "well-to-do",
+        "well-worn",
+        "wet",
+        "which",
+        "whimsical",
+        "whirlwind",
+        "whispered",
+        "white",
+        "whole",
+        "whopping",
+        "wicked",
+        "wide",
+        "wide-eyed",
+        "wiggly",
+        "wild",
+        "willing",
+        "wilted",
+        "winding",
+        "windy",
+        "winged",
+        "wiry",
+        "wise",
+        "witty",
+        "wobbly",
+        "woeful",
+        "wonderful",
+        "wooden",
+        "woozy",
+        "wordy",
+        "worldly",
+        "worn",
+        "worried",
+        "worrisome",
+        "worse",
+        "worst",
+        "worthless",
+        "worthwhile",
+        "worthy",
+        "wrathful",
+        "wretched",
+        "writhing",
+        "wrong",
+        "wry",
+        "yawning",
+        "yearly",
+        "yellow",
+        "yellowish",
+        "young",
+        "youthful",
+        "yummy",
+        "zany",
+        "zealous",
+        "zesty",
+        "zigzag",
+    ]
+)
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 7bc00f2..3878975 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -397,7 +397,7 @@ impl PyDataFrameStage {
 
     /// returns the stage ids of that we need to read from in order to execute
     #[getter]
-    pub fn input_stage_ids(&self) -> PyResult<Vec<usize>> {
+    pub fn child_stage_ids(&self) -> PyResult<Vec<usize>> {
         let mut result = vec![];
         self.plan
             .clone()
diff --git a/src/ray_stage_reader.rs b/src/ray_stage_reader.rs
index 093ed4e..317fdcd 100644
--- a/src/ray_stage_reader.rs
+++ b/src/ray_stage_reader.rs
@@ -111,8 +111,11 @@ impl ExecutionPlan for RayStageReaderExec {
         let clients = client_map
             .get(&(self.stage_id, partition))
             .ok_or(internal_datafusion_err!(
-                "No flight clients found for {}",
-                self.stage_id
+                "{} No flight clients found for {}:{}, have {:?}",
+                name,
+                self.stage_id,
+                partition,
+                client_map.keys()
             ))?
             .lock()
             .iter()
@@ -138,17 +141,20 @@ impl ExecutionPlan for RayStageReaderExec {
 
             let mut streams = vec![];
             for mut client in clients {
+                let name = name.clone();
+                trace!("{name} Getting flight stream" );
                 match client.do_get(ticket.clone()).await {
                     Ok(flight_stream) => {
+                        trace!("{name} Got flight stream. headers:{:?}", 
flight_stream.headers());
                         let rbr_stream = 
RecordBatchStreamAdapter::new(schema.clone(),
                             flight_stream
-                                .map_err(|e| internal_datafusion_err!("Error 
consuming flight stream: {}", e)));
+                                .map_err(move |e| internal_datafusion_err!("{} 
Error consuming flight stream: {}", name, e)));
 
                         streams.push(Box::pin(rbr_stream) as 
SendableRecordBatchStream);
                     },
                     Err(e) => {
                         error = true;
-                        yield internal_err!("Error getting flight stream: {}", 
e);
+                        yield internal_err!("{} Error getting flight stream: 
{}", name, e);
                     }
                 }
             }
diff --git a/src/stage_service.rs b/src/stage_service.rs
index 040906d..0307a7b 100644
--- a/src/stage_service.rs
+++ b/src/stage_service.rs
@@ -20,15 +20,18 @@ use std::collections::HashMap;
 use std::error::Error;
 use std::sync::Arc;
 
+use arrow::array::RecordBatch;
+use arrow::datatypes::Schema;
 use arrow_flight::encode::FlightDataEncoderBuilder;
 use arrow_flight::error::FlightError;
 use arrow_flight::FlightClient;
 use datafusion::common::internal_datafusion_err;
 use datafusion::execution::SessionStateBuilder;
+use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::{SessionConfig, SessionContext};
 use datafusion_python::utils::wait_for_future;
-use futures::TryStreamExt;
+use futures::{Stream, TryStreamExt};
 use local_ip_address::local_ip;
 use log::{debug, error, info, trace};
 use tokio::net::TcpListener;
@@ -42,11 +45,11 @@ use 
arrow_flight::{flight_service_server::FlightServiceServer, Ticket};
 
 use pyo3::prelude::*;
 
-use parking_lot::Mutex;
+use parking_lot::{Mutex, RwLock};
 
 use tokio::sync::mpsc::{channel, Receiver, Sender};
 
-use crate::flight::{FlightHandler, FlightServ};
+use crate::flight::{DoGetStream, FlightHandler, FlightServ};
 use crate::isolator::PartitionGroup;
 use crate::util::{
     bytes_to_physical_plan, display_plan_with_partition_counts, 
extract_ticket, fix_plan,
@@ -61,45 +64,67 @@ pub(crate) struct ServiceClients(pub HashMap<(usize, 
usize), Mutex<Vec<FlightCli
 /// StageHandler is a [`FlightHandler`] that serves streams of partitions from 
a hosted Physical Plan
 /// It only responds to the DoGet Arrow Flight method.
 struct StageHandler {
+    /// our name, useful for logging
+    name: String,
+    /// Inner state of the handler
+    inner: RwLock<Option<StageHandlerInner>>,
+}
+
+struct StageHandlerInner {
     /// our stage id that we are hosting
     pub(crate) stage_id: usize,
     /// the physical plan that comprises our stage
-    plan: Arc<dyn ExecutionPlan>,
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
     /// the session context we will use to execute the plan
-    ctx: Mutex<Option<SessionContext>>,
-    /// The partitions we will be hosting from this plan.
-    partition_group: Vec<usize>,
+    pub(crate) ctx: SessionContext,
 }
 
 impl StageHandler {
+    pub fn new(name: String) -> Self {
+        let inner = RwLock::new(None);
+
+        Self { name, inner }
+    }
+    async fn update_plan(
+        &self,
+        stage_id: usize,
+        stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
+        plan: Arc<dyn ExecutionPlan>,
+        partition_group: Vec<usize>,
+    ) -> DFResult<()> {
+        let inner = StageHandlerInner::new(stage_id, stage_addrs, plan, 
partition_group).await?;
+        self.inner.write().replace(inner);
+        Ok(())
+    }
+
+    fn stage_id(&self) -> Option<usize> {
+        self.inner.read().as_ref().map(|i| i.stage_id)
+    }
+}
+
+impl StageHandlerInner {
     pub async fn new(
         stage_id: usize,
-        plan_bytes: &[u8],
+        stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
+        plan: Arc<dyn ExecutionPlan>,
         partition_group: Vec<usize>,
     ) -> DFResult<Self> {
-        let plan = bytes_to_physical_plan(&SessionContext::new(), plan_bytes)?;
-        let plan = fix_plan(plan)?;
-        debug!(
-            "StageHandler::new [Stage:{}], plan:\n{}",
-            stage_id,
-            display_plan_with_partition_counts(&plan)
-        );
-
-        let ctx = Mutex::new(None);
+        let ctx = Self::configure_ctx(stage_id, stage_addrs, &plan, 
partition_group).await?;
 
         Ok(Self {
             stage_id,
             plan,
             ctx,
-            partition_group,
         })
     }
 
     async fn configure_ctx(
-        &self,
+        stage_id: usize,
         stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
-    ) -> DFResult<()> {
-        let stage_ids_i_need = input_stage_ids(&self.plan)?;
+        plan: &Arc<dyn ExecutionPlan>,
+        partition_group: Vec<usize>,
+    ) -> DFResult<SessionContext> {
+        let stage_ids_i_need = input_stage_ids(&plan)?;
 
         // map of stage_id, partition -> Vec<FlightClient>
         let mut client_map = HashMap::new();
@@ -141,7 +166,7 @@ impl StageHandler {
 
         // this only matters if the plan includes an PartitionIsolatorExec, 
which looks for this
         // for this extension and will be ignored otherwise
-        config = 
config.with_extension(Arc::new(PartitionGroup(self.partition_group.clone())));
+        config = 
config.with_extension(Arc::new(PartitionGroup(partition_group.clone())));
 
         let state = SessionStateBuilder::new()
             .with_default_features()
@@ -149,12 +174,33 @@ impl StageHandler {
             .build();
         let ctx = SessionContext::new_with_state(state);
 
-        self.ctx.lock().replace(ctx);
-        trace!("ctx configured for stage {}", self.stage_id);
-        Ok(())
+        trace!("ctx configured for stage {}", stage_id);
+
+        Ok(ctx)
     }
 }
 
+fn make_stream(
+    inner: &StageHandlerInner,
+    partition: usize,
+) -> Result<impl Stream<Item = Result<RecordBatch, FlightError>> + Send + 
'static, Status> {
+    let task_ctx = inner.ctx.task_ctx();
+
+    let stream = inner
+        .plan
+        .execute(partition, task_ctx)
+        .inspect_err(|e| {
+            error!(
+                "{}",
+                format!("Could not get partition stream from plan {e}")
+            )
+        })
+        .map_err(|e| Status::internal(format!("Could not get partition stream 
from plan {e}")))?
+        .map_err(|e| FlightError::from_external_error(Box::new(e)));
+
+    Ok(stream)
+}
+
 #[async_trait]
 impl FlightHandler for StageHandler {
     async fn get_stream(
@@ -168,41 +214,33 @@ impl FlightHandler for StageHandler {
 
         let ticket = request.into_inner();
 
-        let partition = extract_ticket(ticket)
-            .map_err(|e| Status::internal(format!("Unexpected error extracting 
ticket {e}")))?;
+        let partition = extract_ticket(ticket).map_err(|e| {
+            Status::internal(format!(
+                "{}, Unexpected error extracting ticket {e}",
+                self.name
+            ))
+        })?;
 
         trace!(
-            "StageService[Stage:{}], request for partition {} from {}",
-            self.stage_id,
+            "{}, request for partition {} from {}",
+            self.name,
             partition,
             remote_addr
         );
 
-        let task_ctx = self
-            .ctx
-            .lock()
-            .as_ref()
-            .ok_or(Status::internal(format!(
-                "Stage [{}] get_stream cannot find ctx",
-                self.stage_id
-            )))?
-            .task_ctx();
-
+        let name = self.name.clone();
         let stream = self
-            .plan
-            .execute(partition, task_ctx)
-            .inspect_err(|e| {
-                error!(
-                    "{}",
-                    format!("Could not get partition stream from plan {e}")
-                )
-            })
-            .map_err(|e| Status::internal(format!("Could not get partition 
stream from plan {e}")))?
-            .map_err(|e| FlightError::from_external_error(Box::new(e)));
+            .inner
+            .read()
+            .as_ref()
+            .map(|inner| make_stream(inner, partition))
+            .ok_or_else(|| Status::internal(format!("{} No inner found", 
&name)))??;
 
         let out_stream = FlightDataEncoderBuilder::new()
             .build(stream)
-            .map_err(|e| Status::internal(format!("Unexpected error building 
stream {e}")));
+            .map_err(move |e| {
+                Status::internal(format!("{} Unexpected error building stream 
{e}", name))
+            });
 
         Ok(Response::new(Box::pin(out_stream)))
     }
@@ -225,22 +263,15 @@ pub struct StageService {
 #[pymethods]
 impl StageService {
     #[new]
-    pub fn new(
-        py: Python,
-        stage_id: usize,
-        plan_bytes: &[u8],
-        partition_group: Vec<usize>,
-    ) -> PyResult<Self> {
+    pub fn new(name: String) -> PyResult<Self> {
+        let name = format!("[{}]", name);
         let listener = None;
         let addr = None;
 
         let (all_done_tx, all_done_rx) = channel(1);
         let all_done_tx = Arc::new(Mutex::new(all_done_tx));
-        let name = format!("StageService[{}]", stage_id);
 
-        let fut = StageHandler::new(stage_id, plan_bytes, partition_group);
-
-        let handler = Arc::new(wait_for_future(py, fut).to_py_err()?);
+        let handler = Arc::new(StageHandler::new(name.clone()));
 
         Ok(Self {
             name,
@@ -272,33 +303,65 @@ impl StageService {
 
     /// get the address of the listing socket for this service
     pub fn addr(&self) -> PyResult<String> {
-        self.addr
-            .clone()
-            .ok_or_else(|| PyErr::new::<pyo3::exceptions::PyException, 
_>("Couldn't get addr"))
+        self.addr.clone().ok_or_else(|| {
+            PyErr::new::<pyo3::exceptions::PyException, _>(format!(
+                "{},Couldn't get addr",
+                self.name
+            ))
+        })
     }
 
-    pub fn set_stage_addrs<'a>(
-        &mut self,
-        py: Python<'a>,
-        stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
-    ) -> PyResult<Bound<'a, PyAny>> {
-        let handler = self.handler.clone();
+    /// signal to the service that we can shutdown
+    ///
+    /// returns a python coroutine that should be awaited
+    pub fn all_done<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
+        let sender = self.all_done_tx.lock().clone();
+
         let fut = async move {
-            handler.configure_ctx(stage_addrs).await.to_py_err()?;
+            sender.send(()).await.to_py_err()?;
             Ok(())
         };
         pyo3_async_runtimes::tokio::future_into_py(py, fut)
     }
 
-    /// signal to the service that we can shutdown
+    /// replace the plan that this service was providing, we will do this when 
we want
+    /// to reuse the StageService for a subsequent query
+    ///
     /// returns a python coroutine that should be awaited
-    pub fn all_done<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
-        let sender = self.all_done_tx.lock().clone();
+    pub fn update_plan<'a>(
+        &self,
+        py: Python<'a>,
+        stage_id: usize,
+        stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
+        partition_group: Vec<usize>,
+        plan_bytes: &[u8],
+    ) -> PyResult<Bound<'a, PyAny>> {
+        let plan = bytes_to_physical_plan(&SessionContext::new(), plan_bytes)?;
 
+        debug!(
+            "{} Received New Plan: Stage:{} my addr: {}, partition_group {:?}, 
stage_addrs:\n{:?}\nplan:\n{}",
+            self.name,
+            stage_id,
+            self.addr()?,
+            partition_group,
+            stage_addrs,
+            display_plan_with_partition_counts(&plan)
+        );
+
+        let handler = self.handler.clone();
+        let name = self.name.clone();
         let fut = async move {
-            sender.send(()).await.to_py_err()?;
+            handler
+                .update_plan(stage_id, stage_addrs, plan, 
partition_group.clone())
+                .await
+                .to_py_err()?;
+            info!(
+                "{} [stage: {} pg:{:?}] updated plan",
+                name, stage_id, partition_group
+            );
             Ok(())
         };
+
         pyo3_async_runtimes::tokio::future_into_py(py, fut)
     }
 
@@ -309,7 +372,7 @@ impl StageService {
 
         let signal = async move {
             // TODO: handle Result
-            let _ = all_done_rx.recv().await;
+            let result = all_done_rx.recv().await;
         };
 
         let service = FlightServ {
@@ -319,11 +382,9 @@ impl StageService {
         let svc = FlightServiceServer::new(service);
 
         let listener = self.listener.take().unwrap();
-
         let name = self.name.clone();
-        let stage_id = self.handler.stage_id;
+
         let serv = async move {
-            trace!("StageService [{}] Serving", stage_id);
             Server::builder()
                 .add_service(svc)
                 .serve_with_incoming_shutdown(
@@ -331,9 +392,8 @@ impl StageService {
                     signal,
                 )
                 .await
-                .inspect_err(|e| error!("StageService [{}] ERROR serving {e}", 
name))
+                .inspect_err(|e| error!("{}, ERROR serving {e}", name))
                 .map_err(|e| PyErr::new::<pyo3::exceptions::PyException, 
_>(format!("{e}")))?;
-            info!("tageService [{}] DONE serving", name);
             Ok::<(), Box<dyn Error + Send + Sync>>(())
         };
 
diff --git a/tpch/tpc.py b/tpch/tpc.py
index 6c26bf8..8fa5ae8 100644
--- a/tpch/tpc.py
+++ b/tpch/tpc.py
@@ -14,18 +14,28 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+#
+#
+#
+# This file is useful for running a query against a TPCH dataset.
+#
+# You can run an arbitrary query by passing --query 'select...' or you can run 
a
+# TPCH query by passing --qnum 1-22.
 
 import argparse
 import ray
-from datafusion import SessionContext, SessionConfig
-from datafusion_ray import RayContext, prettify, runtime_env
-from datetime import datetime
-import json
+from datafusion_ray import RayContext, runtime_env
 import os
+import sys
 import time
 
-import duckdb
-from datafusion.object_store import AmazonS3
+try:
+    import duckdb
+except ImportError:
+    print(
+        "duckdb not installed, which is used in this file for retrieving the 
TPCH query"
+    )
+    sys.exit(1)
 
 
 def make_ctx(
@@ -33,6 +43,7 @@ def make_ctx(
     concurrency: int,
     batch_size: int,
     partitions_per_worker: int | None,
+    worker_pool_min: int,
     listing_tables: bool,
 ):
 
@@ -51,7 +62,11 @@ def make_ctx(
     # use ray job submit
     ray.init(runtime_env=runtime_env)
 
-    ctx = RayContext(batch_size=batch_size, 
partitions_per_worker=partitions_per_worker)
+    ctx = RayContext(
+        batch_size=batch_size,
+        partitions_per_worker=partitions_per_worker,
+        worker_pool_min=worker_pool_min,
+    )
 
     ctx.set("datafusion.execution.target_partitions", f"{concurrency}")
     # ctx.set("datafusion.execution.parquet.pushdown_filters", "true")
@@ -75,19 +90,19 @@ def main(
     batch_size: int,
     query: str,
     partitions_per_worker: int | None,
-    validate: bool,
+    worker_pool_min: int,
     listing_tables,
 ) -> None:
     ctx = make_ctx(
-        data_path, concurrency, batch_size, partitions_per_worker, 
listing_tables
+        data_path,
+        concurrency,
+        batch_size,
+        partitions_per_worker,
+        worker_pool_min,
+        listing_tables,
     )
     df = ctx.sql(query)
-    for stage in df.stages():
-        print(
-            f"Stage {stage.stage_id} output 
partitions:{stage.num_output_partitions} partition_groups: 
{stage.partition_groups}"
-        )
-        print(stage.execution_plan().display_indent())
-
+    time.sleep(3)
     df.show()
 
 
@@ -110,7 +125,11 @@ if __name__ == "__main__":
         type=int,
         help="Max partitions per Stage Service Worker",
     )
-    parser.add_argument("--validate", action="store_true")
+    parser.add_argument(
+        "--worker-pool-min",
+        type=int,
+        help="Minimum number of RayStages to keep in pool",
+    )
     parser.add_argument("--listing-tables", action="store_true")
     args = parser.parse_args()
 
@@ -125,6 +144,6 @@ if __name__ == "__main__":
         int(args.batch_size),
         query,
         args.partitions_per_worker,
-        args.validate,
+        args.worker_pool_min,
         args.listing_tables,
     )
diff --git a/tpch/tpcbench.py b/tpch/tpcbench.py
index 1a93360..f8a9dbc 100644
--- a/tpch/tpcbench.py
+++ b/tpch/tpcbench.py
@@ -24,7 +24,13 @@ import json
 import os
 import time
 
-import duckdb
+try:
+    import duckdb
+except ImportError:
+    print(
+        "duckdb not installed, which is used in this file for retrieving the 
TPCH query"
+    )
+    sys.exit(1)
 
 
 def tpch_query(qnum: int) -> str:
@@ -38,6 +44,7 @@ def main(
     concurrency: int,
     batch_size: int,
     partitions_per_worker: int | None,
+    worker_pool_min: int,
     listing_tables: bool,
     validate: bool,
     prefetch_buffer_size: int,
@@ -62,6 +69,7 @@ def main(
         batch_size=batch_size,
         partitions_per_worker=partitions_per_worker,
         prefetch_buffer_size=prefetch_buffer_size,
+        worker_pool_min=worker_pool_min,
     )
 
     ctx.set("datafusion.execution.target_partitions", f"{concurrency}")
@@ -189,6 +197,11 @@ if __name__ == "__main__":
         type=int,
         help="How many batches each stage should eagerly buffer",
     )
+    parser.add_argument(
+        "--worker-pool-min",
+        type=int,
+        help="Minimum number of RayStages to keep in pool",
+    )
 
     args = parser.parse_args()
 
@@ -198,6 +211,7 @@ if __name__ == "__main__":
         int(args.concurrency),
         int(args.batch_size),
         args.partitions_per_worker,
+        args.worker_pool_min,
         args.listing_tables,
         args.validate,
         args.prefetch_buffer_size,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to