This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git
The following commit(s) were added to refs/heads/main by this push:
new 116734d Preallocate Ray Workers (#62)
116734d is described below
commit 116734dd4e48a74fbe6a11b936c1b32d2c1c31c8
Author: robtandy <[email protected]>
AuthorDate: Thu Feb 20 11:20:03 2025 -0500
Preallocate Ray Workers (#62)
* Refactor to preallocate a pool of RayStage Actors
* corrected version of datafusion in requirements-in.txt
---
datafusion_ray/__init__.py | 2 +-
datafusion_ray/core.py | 619 +++++++++++------
datafusion_ray/friendly.py | 1588 ++++++++++++++++++++++++++++++++++++++++++++
src/dataframe.rs | 2 +-
src/ray_stage_reader.rs | 14 +-
src/stage_service.rs | 220 +++---
tpch/tpc.py | 53 +-
tpch/tpcbench.py | 16 +-
8 files changed, 2194 insertions(+), 320 deletions(-)
diff --git a/datafusion_ray/__init__.py b/datafusion_ray/__init__.py
index a920253..d26afa8 100644
--- a/datafusion_ray/__init__.py
+++ b/datafusion_ray/__init__.py
@@ -20,6 +20,6 @@ try:
except ImportError:
import importlib_metadata
-from .core import RayContext, prettify, runtime_env
+from .core import RayContext, prettify, runtime_env, RayStagePool
__version__ = importlib_metadata.version(__name__)
diff --git a/datafusion_ray/core.py b/datafusion_ray/core.py
index 3d36941..d9c9742 100644
--- a/datafusion_ray/core.py
+++ b/datafusion_ray/core.py
@@ -17,14 +17,16 @@
from collections import defaultdict
+from dataclasses import dataclass
import logging
import os
import pyarrow as pa
import asyncio
import ray
-import uuid
+import json
import time
-from typing import Optional
+
+from .friendly import new_friendly_name
from datafusion_ray._datafusion_ray_internal import (
RayContext as RayContextInternal,
@@ -65,15 +67,11 @@ def call_sync(coro):
"""call a coroutine in the current event loop or run a new one, and
synchronously
return the result"""
try:
- try:
- loop = asyncio.get_running_loop()
- except RuntimeError:
- return asyncio.run(coro)
- else:
- return loop.run_until_complete(coro)
- except Exception as e:
- log.error(f"Error in call: {e}")
- log.exception(e)
+ loop = asyncio.get_running_loop()
+ except RuntimeError:
+ return asyncio.run(coro)
+ else:
+ return loop.run_until_complete(coro)
# work around for https://github.com/ray-project/ray/issues/31606
@@ -82,30 +80,376 @@ async def _ensure_coro(maybe_obj_ref):
async def wait_for(coros, name=""):
+ """Wait for all coros to complete and return their results.
+ Does not preserve ordering."""
+
return_values = []
# wrap the coro in a task to work with python 3.10 and 3.11+ where
asyncio.wait semantics
# changed to not accept any awaitable
+ start = time.time()
done, _ = await asyncio.wait([asyncio.create_task(_ensure_coro(c)) for c
in coros])
+ end = time.time()
+ log.info(f"waiting for {name} took {end - start}s")
for d in done:
e = d.exception()
if e is not None:
log.error(f"Exception waiting {name}: {e}")
+ raise e
else:
return_values.append(d.result())
return return_values
+class RayStagePool:
+ """A pool of RayStage actors that can be acquired and released"""
+
+ # TODO: We can probably manage this set in a better way
+ # This is not a threadsafe implementation.
+ # This is simple though and will suffice for now
+
+ def __init__(self, min_workers: int, max_workers: int):
+ self.min_workers = min_workers
+ self.max_workers = max_workers
+
+ # a map of stage_key (a random identifier) to stage actor reference
+ self.pool = {}
+ # a map of stage_key to listening address
+ self.addrs = {}
+
+ # holds object references from the start_up method for each stage
+ # we know all stages are listening when all of these refs have
+ # been waited on. When they are ready we remove them from this set
+ self.stages_started = set()
+
+ # an event that is set when all stages are ready to serve
+ self.stages_ready = asyncio.Event()
+
+ # stages that are started but we need to get their address
+ self.need_address = set()
+
+ # stages that we have the address for but need to start serving
+ self.need_serving = set()
+
+ # stages in use
+ self.acquired = set()
+
+ # stages available
+ self.available = set()
+
+ for _ in range(min_workers):
+ self._new_stage()
+
+ log.info(
+ f"created ray stage pool (min_workers: {min_workers}, max_workers:
{max_workers})"
+ )
+
+ async def start(self):
+ if not self.stages_ready.is_set():
+ await self._wait_for_stages_started()
+ await self._wait_for_get_addrs()
+ await self._wait_for_serve()
+ self.stages_ready.set()
+
+ async def wait_for_ready(self):
+ await self.stages_ready.wait()
+
+ async def acquire(self, need=1):
+ stage_keys = []
+
+ have = len(self.available)
+ total = len(self.available) + len(self.acquired)
+ can_make = self.max_workers - total
+
+ need_to_make = need - have
+
+ if need_to_make > can_make:
+ raise Exception(f"Cannot allocate workers above
{self.max_workers}")
+
+ if need_to_make > 0:
+ log.debug(f"creating {need_to_make} additional stages")
+ for _ in range(need_to_make):
+ self._new_stage()
+ await wait_for([self.start()], "waiting for created stages")
+
+ assert len(self.available) >= need
+
+ for _ in range(need):
+ stage_key = self.available.pop()
+ self.acquired.add(stage_key)
+
+ stage_keys.append(stage_key)
+
+ stages = [self.pool[sk] for sk in stage_keys]
+ addrs = [self.addrs[sk] for sk in stage_keys]
+ return (stages, stage_keys, addrs)
+
+ def release(self, stage_keys: list[str]):
+ for stage_key in stage_keys:
+ self.acquired.remove(stage_key)
+ self.available.add(stage_key)
+
+ def _new_stage(self):
+ self.stages_ready.clear()
+ stage_key = new_friendly_name()
+ log.debug(f"starting stage: {stage_key}")
+ stage = RayStage.options(name=f"Stage: {stage_key}").remote(stage_key)
+ self.pool[stage_key] = stage
+ self.stages_started.add(stage.start_up.remote())
+ self.available.add(stage_key)
+
+ async def _wait_for_stages_started(self):
+ log.info("waiting for stages to be ready")
+ started_keys = await wait_for(self.stages_started, "stages to be
started")
+ # we need the addresses of these stages still
+ self.need_address.update(set(started_keys))
+ # we've started all the stages we know about
+ self.stages_started = set()
+ log.info("stages are all listening")
+
+ async def _wait_for_get_addrs(self):
+ # get the addresses in a pipelined fashion
+ refs = []
+ stage_keys = []
+ for stage_key in self.need_address:
+ stage = self.pool[stage_key]
+ refs.append(stage.addr.remote())
+ stage_keys.append(stage_key)
+
+ self.need_serving.add(stage_key)
+
+ addrs = await wait_for(refs, "stage addresses")
+
+ for key, addr in addrs:
+ self.addrs[key] = addr
+
+ self.need_address = set()
+
+ async def _wait_for_serve(self):
+ log.info("running stages")
+ try:
+ for stage_key in self.need_serving:
+ log.info(f"starting serving of stage {stage_key}")
+ stage = self.pool[stage_key]
+ stage.serve.remote()
+ self.need_serving = set()
+
+ except Exception as e:
+ log.error(f"StagePool: Uhandled Exception in serve: {e}")
+ raise e
+
+ async def all_done(self):
+ log.info("calling stage all done")
+ refs = [stage.all_done.remote() for stage in self.pool.values()]
+ await wait_for(refs, "stages to be all done")
+ log.info("all stages shutdown")
+
+
[email protected](num_cpus=0)
+class RayStage:
+ def __init__(self, stage_key):
+ self.stage_key = stage_key
+
+ # import this here so ray doesn't try to serialize the rust extension
+ from datafusion_ray._datafusion_ray_internal import StageService
+
+ self.stage_service = StageService(stage_key)
+
+ async def start_up(self):
+ # this method is sync
+ self.stage_service.start_up()
+ return self.stage_key
+
+ async def all_done(self):
+ await self.stage_service.all_done()
+
+ async def addr(self):
+ return (self.stage_key, self.stage_service.addr())
+
+ async def update_plan(
+ self,
+ stage_id: int,
+ stage_addrs: dict[int, dict[int, list[str]]],
+ partition_group: list[int],
+ plan_bytes: bytes,
+ ):
+ await self.stage_service.update_plan(
+ stage_id,
+ stage_addrs,
+ partition_group,
+ plan_bytes,
+ )
+
+ async def serve(self):
+ log.info(f"[{self.stage_key}] serving on {self.stage_service.addr()}")
+ await self.stage_service.serve()
+ log.info(f"[{self.stage_key}] done serving")
+
+
+@dataclass
+class StageData:
+ stage_id: int
+ plan_bytes: bytes
+ partition_group: list[int]
+ child_stage_ids: list[int]
+ num_output_partitions: int
+ full_partitions: bool
+
+
+@dataclass
+class InternalStageData:
+ stage_id: int
+ plan_bytes: bytes
+ partition_group: list[int]
+ child_stage_ids: list[int]
+ num_output_partitions: int
+ full_partitions: bool
+ remote_stage: ... # ray.actor.ActorHandle[RayStage]
+ remote_addr: str
+
+ def __str__(self):
+ return f"""Stage: {self.stage_id}, pg: {self.partition_group},
child_stages:{self.child_stage_ids}, listening addr:{self.remote_addr}"""
+
+
[email protected](num_cpus=0)
+class RayContextSupervisor:
+ def __init__(
+ self,
+ worker_pool_min: int,
+ worker_pool_max: int,
+ ) -> None:
+ log.info(f"Creating RayContextSupervisor worker_pool_min:
{worker_pool_min}")
+ self.pool = RayStagePool(worker_pool_min, worker_pool_max)
+ self.stages: dict[str, InternalStageData] = {}
+ log.info("Created RayContextSupervisor")
+
+ async def start(self):
+ await self.pool.start()
+
+ async def wait_for_ready(self):
+ await self.pool.wait_for_ready()
+
+ async def get_stage_addrs(self, stage_id: int):
+ addrs = [
+ sd.remote_addr for sd in self.stages.values() if sd.stage_id ==
stage_id
+ ]
+ return addrs
+
+ async def new_query(
+ self,
+ stage_datas: list[StageData],
+ ):
+ if len(self.stages) > 0:
+ self.pool.release(list(self.stages.keys()))
+
+ remote_stages, remote_stage_keys, remote_addrs = await
self.pool.acquire(
+ len(stage_datas)
+ )
+ self.stages = {}
+
+ for i, sd in enumerate(stage_datas):
+ remote_stage = remote_stages[i]
+ remote_stage_key = remote_stage_keys[i]
+ remote_addr = remote_addrs[i]
+ self.stages[remote_stage_key] = InternalStageData(
+ sd.stage_id,
+ sd.plan_bytes,
+ sd.partition_group,
+ sd.child_stage_ids,
+ sd.num_output_partitions,
+ sd.full_partitions,
+ remote_stage,
+ remote_addr,
+ )
+
+ # sort out the mess of who talks to whom and ensure we can supply the
correct
+ # addresses to each of them
+ addrs_by_stage_key = await self.sort_out_addresses()
+ if log.level <= logging.DEBUG:
+ # TODO: string builder here
+ out = ""
+ for stage_key, stage in self.stages.items():
+ out += f"[{stage_key}]: {stage}\n"
+ out += f"child addrs: {addrs_by_stage_key[stage_key]}\n"
+ log.debug(out)
+
+ refs = []
+ # now tell the stages what they are doing for this query
+ for stage_key, isd in self.stages.items():
+ log.info(f"going to update plan for {stage_key}")
+ kid = addrs_by_stage_key[stage_key]
+ refs.append(
+ isd.remote_stage.update_plan.remote(
+ isd.stage_id,
+ {stage_id: val["child_addrs"] for (stage_id, val) in
kid.items()},
+ isd.partition_group,
+ isd.plan_bytes,
+ )
+ )
+ log.info("that's all of them")
+
+ await wait_for(refs, "updating plans")
+
+ async def sort_out_addresses(self):
+ """Iterate through our stages and gather all of their listening
addresses.
+ Then, provide the addresses to of peer stages to each stage.
+ """
+ addrs_by_stage_key = {}
+ for stage_key, isd in self.stages.items():
+ stage_addrs = defaultdict(dict)
+
+ # using "isd" as shorthand to denote InternalStageData as a
reminder
+
+ for child_stage_id in isd.child_stage_ids:
+ addrs = defaultdict(list)
+ child_stage_keys, child_stage_datas = zip(
+ *filter(
+ lambda x: x[1].stage_id == child_stage_id,
+ self.stages.items(),
+ )
+ )
+ output_partitions = [
+ c_isd.num_output_partitions for c_isd in child_stage_datas
+ ]
+
+ # sanity check
+ assert all([op == output_partitions[0] for op in
output_partitions])
+ output_partitions = output_partitions[0]
+
+ for child_stage_isd in child_stage_datas:
+ if child_stage_isd.full_partitions:
+ for partition in range(output_partitions):
+ # this stage is the definitive place to read this
output partition
+ addrs[partition] = [child_stage_isd.remote_addr]
+ else:
+ for partition in range(output_partitions):
+ # this output partition must be gathered from all
stages with this stage_id
+ addrs[partition] = [
+ c.remote_addr for c in child_stage_datas
+ ]
+
+ stage_addrs[child_stage_id]["child_addrs"] = addrs
+ # not necessary but useful for debug logs
+ stage_addrs[child_stage_id]["stage_keys"] = child_stage_keys
+
+ addrs_by_stage_key[stage_key] = stage_addrs
+
+ return addrs_by_stage_key
+
+ async def all_done(self):
+ await self.pool.all_done()
+
+
class RayDataFrame:
def __init__(
self,
ray_internal_df: RayDataFrameInternal,
- query_id: str,
+ supervisor, # ray.actor.ActorHandle[RayContextSupervisor],
batch_size=8192,
- partitions_per_worker: Optional[int] = None,
+ partitions_per_worker: int | None = None,
prefetch_buffer_size=0,
):
self.df = ray_internal_df
- self.query_id = query_id
+ self.supervisor = supervisor
self._stages = None
self._batches = None
self.batch_size = batch_size
@@ -119,12 +463,6 @@ class RayDataFrame:
self.batch_size, self.prefetch_buffer_size,
self.partitions_per_worker
)
- self.coord = RayStageCoordinator.options(
- name="RayQueryCoordinator:" + self.query_id,
- ).remote(
- self.query_id,
- )
-
return self._stages
def execution_plan(self):
@@ -143,19 +481,19 @@ class RayDataFrame:
t2 = time.time()
log.debug(f"creating stages took {t2 -t1}s")
- last_stage = max([stage.stage_id for stage in self._stages])
- log.debug(f"last stage is {last_stage}")
+ last_stage_id = max([stage.stage_id for stage in self._stages])
+ log.debug(f"last stage is {last_stage_id}")
self.create_ray_stages()
- t3 = time.time()
- log.debug(f"creating ray stage actors took {t3 -t2}s")
- self.run_stages()
- addrs = ray.get(self.coord.get_stage_addrs.remote())
+ last_stage_addrs = ray.get(
+ self.supervisor.get_stage_addrs.remote(last_stage_id)
+ )
+ log.debug(f"last stage addrs {last_stage_addrs}")
- reader = self.df.read_final_stage(last_stage,
addrs[last_stage][0][0])
+ reader = self.df.read_final_stage(last_stage_id,
last_stage_addrs[0])
+ log.debug("got reader")
self._batches = list(reader)
- self.coord.all_done.remote()
return self._batches
def show(self) -> None:
@@ -163,31 +501,27 @@ class RayDataFrame:
print(prettify(batches))
def create_ray_stages(self):
+ stage_datas = []
- # if we are doing each partition separate (isolate_partitions =True)
- # then the plan generated will include a PartitionIsolator which
- # will take care of that. Our job is to then launch a stage for each
- # partition.
- #
- refs = []
+ # note, whereas the PyDataFrameStage object contained in self.stages()
+ # holds information for a numbered stage,
+ # when we tell the supervisor about our query, it wants a StageData
+ # object per actor that will be created. Hence the loop over
partition_groups
for stage in self.stages():
for partition_group in stage.partition_groups:
- refs.append(
- self.coord.new_stage.remote(
+ stage_datas.append(
+ StageData(
stage.stage_id,
stage.plan_bytes(),
partition_group,
+ stage.child_stage_ids,
stage.num_output_partitions,
stage.full_partitions,
)
)
- # wait for all stages to be created
- # ray.wait(refs, num_returns=len(refs))
- call_sync(wait_for(refs, "creating ray stages"))
-
- def run_stages(self):
- self.coord.serve.remote()
+ ref = self.supervisor.new_query.remote(stage_datas)
+ call_sync(wait_for([ref], "creating ray stages"))
class RayContext:
@@ -195,13 +529,32 @@ class RayContext:
self,
batch_size: int = 8192,
prefetch_buffer_size: int = 0,
- partitions_per_worker: Optional[int] = None,
+ partitions_per_worker: int | None = None,
+ worker_pool_min: int = 1,
+ worker_pool_max: int = 100,
) -> None:
self.ctx = RayContextInternal()
self.batch_size = batch_size
self.partitions_per_worker = partitions_per_worker
self.prefetch_buffer_size = prefetch_buffer_size
+ self.supervisor = RayContextSupervisor.options(
+ name="RayContextSupersisor",
+ ).remote(
+ worker_pool_min,
+ worker_pool_max,
+ )
+
+ # start up our super visor and don't check in on it until its
+ # time to query, then we will await this ref
+ start_ref = self.supervisor.start.remote()
+
+ # ensure we are ready
+ s = time.time()
+ call_sync(wait_for([start_ref], "RayContextSupervisor start"))
+ e = time.time()
+ log.info(f"RayContext::__init__ waiting for supervisor to be ready
took {e-s}s")
+
def register_parquet(self, name: str, path: str):
self.ctx.register_parquet(name, path)
@@ -209,12 +562,12 @@ class RayContext:
self.ctx.register_listing_table(name, path, file_extention)
def sql(self, query: str) -> RayDataFrame:
- query_id = str(uuid.uuid4())
df = self.ctx.sql(query)
+
return RayDataFrame(
df,
- query_id,
+ self.supervisor,
self.batch_size,
self.partitions_per_worker,
self.prefetch_buffer_size,
@@ -223,175 +576,9 @@ class RayContext:
def set(self, option: str, value: str) -> None:
self.ctx.set(option, value)
+ def __del__(self):
+ log.info("RayContext, cleaning up remote resources")
+ ref = self.supervisor.all_done.remote()
+ call_sync(wait_for([ref], "RayContextSupervisor all done"))
[email protected](num_cpus=0)
-class RayStageCoordinator:
- def __init__(
- self,
- query_id: str,
- ) -> None:
- self.query_id = query_id
- self.stages = {}
- self.stage_addrs = defaultdict(lambda: defaultdict(list))
- self.output_partitions = {}
- self.stages_started = []
- self.stages_ready = asyncio.Event()
-
- async def all_done(self):
- log.debug("calling stage all done")
- refs = [stage.all_done.remote() for stage in self.stages.values()]
- # ray.wait(refs, num_returns=len(refs))
- await wait_for(refs, "stages to be all done")
- log.debug("done stage all done")
-
- async def new_stage(
- self,
- stage_id: int,
- plan_bytes: bytes,
- partition_group: list[int],
- num_output_partitions: int,
- full_partitions: bool,
- ):
-
- try:
- if stage_id in self.output_partitions:
- assert self.output_partitions[stage_id] ==
num_output_partitions
- else:
- self.output_partitions[stage_id] = num_output_partitions
-
- # we need a tuple so its hashable
- partition_set = tuple(partition_group)
- stage_key = (stage_id, partition_set, full_partitions)
-
- log.debug(f"creating new stage {stage_key} from bytes
{len(plan_bytes)}")
- stage = RayStage.options(
- name=f"Stage: {stage_key}, query_id:{self.query_id}",
- ).remote(stage_id, plan_bytes, partition_group)
- self.stages[stage_key] = stage
- self.stages_started.append(stage.start_up.remote())
-
- except Exception as e:
- log.error(
- f"RayQueryCoordinator[{self.query_id}] Unhandled Exception in
new stage! {e}"
- )
- raise e
-
- async def wait_for_stages_ready(self):
- log.debug("waiting for stages to be ready")
- await self.stages_ready.wait()
-
- async def ensure_stages_ready(self):
- # ray.wait(self.stages_started, num_returns=len(self.stages_started))
- log.debug(f"going to wait for {self.stages_started}")
- await wait_for(self.stages_started, "stages to be started")
- await self.sort_out_addresses()
- log.info("all stages ready")
- self.stages_ready.set()
-
- async def get_stage_addrs(self) -> dict[int, list[str]]:
- log.debug("Checking to ensure stages are ready before returning addrs")
- await self.wait_for_stages_ready()
- log.debug("Looks like they are ready")
- return self.stage_addrs
-
- async def sort_out_addresses(self):
- """Iterate through our stages and gather all of their listening
addresses.
- Then, provide the addresses to of peer stages to each stage.
- """
-
- # first go get all addresses from the stages we launched, concurrently
- # pipeline this by firing up all tasks before awaiting any results
- addrs_by_stage = defaultdict(list)
- addrs_by_stage_partition = defaultdict(dict)
- for stage_key, stage in self.stages.items():
- stage_id, partition_set, full_partitions = stage_key
- a_future = stage.addr.remote()
- addrs_by_stage[stage_id].append(a_future)
- for partition in partition_set:
- addrs_by_stage_partition[stage_id][partition] = a_future
-
- for stage_key, stage in self.stages.items():
- stage_id, partition_set, full_partitions = stage_key
- if full_partitions:
- for partition in range(self.output_partitions[stage_id]):
- self.stage_addrs[stage_id][partition] = await wait_for(
- [addrs_by_stage_partition[stage_id][partition]]
- )
- else:
- for partition in range(self.output_partitions[stage_id]):
- self.stage_addrs[stage_id][partition] = await wait_for(
- addrs_by_stage[stage_id]
- )
-
- if log.level <= logging.DEBUG:
- out = ""
- for stage_id, partition_addrs in self.stage_addrs.items():
- out += f"Stage {stage_id}: \n"
- for partition, addrs in partition_addrs.items():
- out += f" partition {partition}: {addrs}\n"
- log.debug(f"stage_addrs:\n{out}")
- # now update all the stages with the addresses of peers such
- # that they can contact their child stages
- refs = []
- for stage_key, stage in self.stages.items():
- refs.append(stage.set_stage_addrs.remote(self.stage_addrs))
-
- # ray.wait(refs, num_returns=len(refs))
- await wait_for(refs, "stages to to have addrs set")
- log.debug("all stage addrs set? or should be")
-
- async def serve(self):
- await self.ensure_stages_ready()
- log.info("running stages")
- try:
- for stage_key, stage in self.stages.items():
- log.info(f"starting serving of stage {stage_key}")
- stage.serve.remote()
-
- except Exception as e:
- log.error(
- f"RayQueryCoordinator[{self.query_id}] Unhandled Exception in
run stages! {e}"
- )
- raise e
-
-
[email protected](num_cpus=0)
-class RayStage:
- def __init__(
- self,
- stage_id: int,
- plan_bytes: bytes,
- partition_group: list[int],
- ):
-
- from datafusion_ray._datafusion_ray_internal import StageService
-
- try:
- self.stage_id = stage_id
- self.stage_service = StageService(
- stage_id,
- plan_bytes,
- partition_group,
- )
- except Exception as e:
- log.error(
- f"StageService[{self.stage_id}{partition_group}] Unhandled
Exception in init: {e}!"
- )
- raise
-
- async def start_up(self):
- # this method is sync
- self.stage_service.start_up()
-
- async def all_done(self):
- await self.stage_service.all_done()
-
- async def addr(self):
- return self.stage_service.addr()
-
- async def set_stage_addrs(self, stage_addrs: dict[int, list[str]]):
- await self.stage_service.set_stage_addrs(stage_addrs)
-
- async def serve(self):
- await self.stage_service.serve()
- log.info("StageService done serving")
+ # log.debug("all stage addrs set? or should be")
diff --git a/datafusion_ray/friendly.py b/datafusion_ray/friendly.py
new file mode 100644
index 0000000..49d31e7
--- /dev/null
+++ b/datafusion_ray/friendly.py
@@ -0,0 +1,1588 @@
+# Adapted from http://en.wikipedia.org/wiki/List_of_animal_names
+# and https://github.com/bryanmylee/zoo-ids
+
+import random
+
+
+def new_friendly_name():
+ name = random.choice(animals)
+ adj = adjectives.pop()
+ return f"{adj}-{name}"
+
+
+# In total, 221 animals
+animals = [
+ "aardvark",
+ "albatross",
+ "alligator",
+ "alpaca",
+ "ant",
+ "anteater",
+ "antelope",
+ "ape",
+ "armadillo",
+ "donkey",
+ "baboon",
+ "badger",
+ "barracuda",
+ "bat",
+ "bear",
+ "beaver",
+ "bee",
+ "bison",
+ "boar",
+ "buffalo",
+ "butterfly",
+ "camel",
+ "capybara",
+ "caribou",
+ "cassowary",
+ "cat",
+ "caterpillar",
+ "cattle",
+ "chamois",
+ "cheetah",
+ "chicken",
+ "chimpanzee",
+ "chinchilla",
+ "chough",
+ "clam",
+ "cobra",
+ "cockroach",
+ "cod",
+ "cormorant",
+ "coyote",
+ "crab",
+ "crane",
+ "crocodile",
+ "crow",
+ "curlew",
+ "deer",
+ "dinosaur",
+ "dog",
+ "dogfish",
+ "dolphin",
+ "dotterel",
+ "dove",
+ "dragonfly",
+ "duck",
+ "dugong",
+ "dunlin",
+ "eagle",
+ "echidna",
+ "eel",
+ "eland",
+ "elephant",
+ "elk",
+ "emu",
+ "falcon",
+ "ferret",
+ "finch",
+ "fish",
+ "flamingo",
+ "fly",
+ "fox",
+ "frog",
+ "gaur",
+ "gazelle",
+ "gerbil",
+ "giraffe",
+ "gnat",
+ "gnu",
+ "goat",
+ "goldfinch",
+ "goldfish",
+ "goose",
+ "gorilla",
+ "goshawk",
+ "grasshopper",
+ "grouse",
+ "guanaco",
+ "gull",
+ "hamster",
+ "hare",
+ "hawk",
+ "hedgehog",
+ "heron",
+ "herring",
+ "hippopotamus",
+ "hornet",
+ "horse",
+ "human",
+ "hummingbird",
+ "hyena",
+ "ibex",
+ "ibis",
+ "jackal",
+ "jaguar",
+ "jay",
+ "jellyfish",
+ "kangaroo",
+ "kingfisher",
+ "koala",
+ "kookabura",
+ "kouprey",
+ "kudu",
+ "lapwing",
+ "lark",
+ "lemur",
+ "leopard",
+ "lion",
+ "llama",
+ "lobster",
+ "locust",
+ "loris",
+ "louse",
+ "lyrebird",
+ "magpie",
+ "mallard",
+ "manatee",
+ "mandrill",
+ "mantis",
+ "marten",
+ "meerkat",
+ "mink",
+ "mole",
+ "mongoose",
+ "monkey",
+ "moose",
+ "mosquito",
+ "mouse",
+ "mule",
+ "narwhal",
+ "newt",
+ "nightingale",
+ "octopus",
+ "okapi",
+ "opossum",
+ "oryx",
+ "ostrich",
+ "otter",
+ "owl",
+ "oyster",
+ "panther",
+ "parrot",
+ "partridge",
+ "peafowl",
+ "pelican",
+ "penguin",
+ "pheasant",
+ "pig",
+ "pigeon",
+ "pony",
+ "porcupine",
+ "porpoise",
+ "quail",
+ "quelea",
+ "quetzal",
+ "rabbit",
+ "raccoon",
+ "rail",
+ "ram",
+ "rat",
+ "raven",
+ "reindeer",
+ "rhinoceros",
+ "rook",
+ "salamander",
+ "salmon",
+ "sandpiper",
+ "sardine",
+ "scorpion",
+ "seahorse",
+ "seal",
+ "shark",
+ "sheep",
+ "shrew",
+ "skunk",
+ "snail",
+ "snake",
+ "sparrow",
+ "spider",
+ "spoonbill",
+ "squid",
+ "squirrel",
+ "starling",
+ "stingray",
+ "stinkbug",
+ "stork",
+ "swallow",
+ "swan",
+ "tapir",
+ "tarsier",
+ "termite",
+ "tiger",
+ "toad",
+ "trout",
+ "turkey",
+ "turtle",
+ "viper",
+ "vulture",
+ "wallaby",
+ "walrus",
+ "wasp",
+ "weasel",
+ "whale",
+ "wildcat",
+ "wolf",
+ "wolverine",
+ "wombat",
+ "woodcock",
+ "woodpecker",
+ "worm",
+ "wren",
+ "yak",
+ "zebra",
+]
+
+adjectives = set(
+ [
+ "abandoned",
+ "able",
+ "absolute",
+ "adorable",
+ "adventurous",
+ "academic",
+ "acceptable",
+ "acclaimed",
+ "accomplished",
+ "accurate",
+ "aching",
+ "acidic",
+ "acrobatic",
+ "active",
+ "actual",
+ "adept",
+ "admirable",
+ "admired",
+ "adolescent",
+ "adorable",
+ "adored",
+ "advanced",
+ "afraid",
+ "affectionate",
+ "aged",
+ "aggravating",
+ "aggressive",
+ "agile",
+ "agitated",
+ "agonizing",
+ "agreeable",
+ "ajar",
+ "alarmed",
+ "alarming",
+ "alert",
+ "alienated",
+ "alive",
+ "all",
+ "altruistic",
+ "amazing",
+ "ambitious",
+ "ample",
+ "amused",
+ "amusing",
+ "anchored",
+ "ancient",
+ "angelic",
+ "angry",
+ "anguished",
+ "animated",
+ "annual",
+ "another",
+ "antique",
+ "anxious",
+ "any",
+ "apprehensive",
+ "appropriate",
+ "apt",
+ "arctic",
+ "arid",
+ "aromatic",
+ "artistic",
+ "ashamed",
+ "assured",
+ "astonishing",
+ "athletic",
+ "attached",
+ "attentive",
+ "attractive",
+ "austere",
+ "authentic",
+ "authorized",
+ "automatic",
+ "avaricious",
+ "average",
+ "aware",
+ "awesome",
+ "awful",
+ "awkward",
+ "babyish",
+ "bad",
+ "back",
+ "baggy",
+ "bare",
+ "barren",
+ "basic",
+ "beautiful",
+ "belated",
+ "beloved",
+ "beneficial",
+ "better",
+ "best",
+ "bewitched",
+ "big",
+ "big-hearted",
+ "biodegradable",
+ "bite-sized",
+ "bitter",
+ "black",
+ "black-and-white",
+ "bland",
+ "blank",
+ "blaring",
+ "bleak",
+ "blind",
+ "blissful",
+ "blond",
+ "blue",
+ "blushing",
+ "bogus",
+ "boiling",
+ "bold",
+ "bony",
+ "boring",
+ "bossy",
+ "both",
+ "bouncy",
+ "bountiful",
+ "bowed",
+ "brave",
+ "breakable",
+ "brief",
+ "bright",
+ "brilliant",
+ "brisk",
+ "broken",
+ "bronze",
+ "brown",
+ "bruised",
+ "bubbly",
+ "bulky",
+ "bumpy",
+ "buoyant",
+ "burdensome",
+ "burly",
+ "bustling",
+ "busy",
+ "buttery",
+ "buzzing",
+ "calculating",
+ "calm",
+ "candid",
+ "canine",
+ "capital",
+ "carefree",
+ "careful",
+ "careless",
+ "caring",
+ "cautious",
+ "cavernous",
+ "celebrated",
+ "charming",
+ "cheap",
+ "cheerful",
+ "cheery",
+ "chief",
+ "chilly",
+ "chubby",
+ "circular",
+ "classic",
+ "clean",
+ "clear",
+ "clear-cut",
+ "clever",
+ "close",
+ "closed",
+ "cloudy",
+ "clueless",
+ "clumsy",
+ "cluttered",
+ "coarse",
+ "cold",
+ "colorful",
+ "colorless",
+ "colossal",
+ "comfortable",
+ "common",
+ "compassionate",
+ "competent",
+ "complete",
+ "complex",
+ "complicated",
+ "composed",
+ "concerned",
+ "concrete",
+ "confused",
+ "conscious",
+ "considerate",
+ "constant",
+ "content",
+ "conventional",
+ "cooked",
+ "cool",
+ "cooperative",
+ "coordinated",
+ "corny",
+ "corrupt",
+ "costly",
+ "courageous",
+ "courteous",
+ "crafty",
+ "crazy",
+ "creamy",
+ "creative",
+ "creepy",
+ "criminal",
+ "crisp",
+ "critical",
+ "crooked",
+ "crowded",
+ "cruel",
+ "crushing",
+ "cuddly",
+ "cultivated",
+ "cultured",
+ "cumbersome",
+ "curly",
+ "curvy",
+ "cute",
+ "cylindrical",
+ "damaged",
+ "damp",
+ "dangerous",
+ "dapper",
+ "daring",
+ "darling",
+ "dark",
+ "dazzling",
+ "dead",
+ "deadly",
+ "deafening",
+ "dear",
+ "dearest",
+ "decent",
+ "decimal",
+ "decisive",
+ "deep",
+ "defenseless",
+ "defensive",
+ "defiant",
+ "deficient",
+ "definite",
+ "definitive",
+ "delayed",
+ "delectable",
+ "delicious",
+ "delightful",
+ "delirious",
+ "demanding",
+ "dense",
+ "dental",
+ "dependable",
+ "dependent",
+ "descriptive",
+ "deserted",
+ "detailed",
+ "determined",
+ "devoted",
+ "different",
+ "difficult",
+ "digital",
+ "diligent",
+ "dim",
+ "dimpled",
+ "dimwitted",
+ "direct",
+ "disastrous",
+ "discrete",
+ "disfigured",
+ "disgusting",
+ "disloyal",
+ "dismal",
+ "distant",
+ "downright",
+ "dreary",
+ "dirty",
+ "disguised",
+ "dishonest",
+ "dismal",
+ "distant",
+ "distinct",
+ "distorted",
+ "dizzy",
+ "dopey",
+ "doting",
+ "double",
+ "downright",
+ "drab",
+ "drafty",
+ "dramatic",
+ "dreary",
+ "droopy",
+ "dry",
+ "dual",
+ "dull",
+ "dutiful",
+ "each",
+ "eager",
+ "earnest",
+ "early",
+ "easy",
+ "easy-going",
+ "ecstatic",
+ "edible",
+ "educated",
+ "elaborate",
+ "elastic",
+ "elated",
+ "elderly",
+ "electric",
+ "elegant",
+ "elementary",
+ "elliptical",
+ "embarrassed",
+ "embellished",
+ "eminent",
+ "emotional",
+ "empty",
+ "enchanted",
+ "enchanting",
+ "energetic",
+ "enlightened",
+ "enormous",
+ "enraged",
+ "entire",
+ "envious",
+ "equal",
+ "equatorial",
+ "essential",
+ "esteemed",
+ "ethical",
+ "euphoric",
+ "even",
+ "evergreen",
+ "everlasting",
+ "every",
+ "evil",
+ "exalted",
+ "excellent",
+ "exemplary",
+ "exhausted",
+ "excitable",
+ "excited",
+ "exciting",
+ "exotic",
+ "expensive",
+ "experienced",
+ "expert",
+ "extraneous",
+ "extroverted",
+ "extra-large",
+ "extra-small",
+ "fabulous",
+ "failing",
+ "faint",
+ "fair",
+ "faithful",
+ "fake",
+ "false",
+ "familiar",
+ "famous",
+ "fancy",
+ "fantastic",
+ "far",
+ "faraway",
+ "far-flung",
+ "far-off",
+ "fast",
+ "fat",
+ "fatal",
+ "fatherly",
+ "favorable",
+ "favorite",
+ "fearful",
+ "fearless",
+ "feisty",
+ "feline",
+ "female",
+ "feminine",
+ "few",
+ "fickle",
+ "filthy",
+ "fine",
+ "finished",
+ "firm",
+ "first",
+ "firsthand",
+ "fitting",
+ "fixed",
+ "flaky",
+ "flamboyant",
+ "flashy",
+ "flat",
+ "flawed",
+ "flawless",
+ "flickering",
+ "flimsy",
+ "flippant",
+ "flowery",
+ "fluffy",
+ "fluid",
+ "flustered",
+ "focused",
+ "fond",
+ "foolhardy",
+ "foolish",
+ "forceful",
+ "forked",
+ "formal",
+ "forsaken",
+ "forthright",
+ "fortunate",
+ "fragrant",
+ "frail",
+ "frank",
+ "frayed",
+ "free",
+ "French",
+ "fresh",
+ "frequent",
+ "friendly",
+ "frightened",
+ "frightening",
+ "frigid",
+ "frilly",
+ "frizzy",
+ "frivolous",
+ "front",
+ "frosty",
+ "frozen",
+ "frugal",
+ "fruitful",
+ "full",
+ "fumbling",
+ "functional",
+ "funny",
+ "fussy",
+ "fuzzy",
+ "gargantuan",
+ "gaseous",
+ "general",
+ "generous",
+ "gentle",
+ "genuine",
+ "giant",
+ "giddy",
+ "gigantic",
+ "gifted",
+ "giving",
+ "glamorous",
+ "glaring",
+ "glass",
+ "gleaming",
+ "gleeful",
+ "glistening",
+ "glittering",
+ "gloomy",
+ "glorious",
+ "glossy",
+ "glum",
+ "golden",
+ "good",
+ "good-natured",
+ "gorgeous",
+ "graceful",
+ "gracious",
+ "grand",
+ "grandiose",
+ "granular",
+ "grateful",
+ "grave",
+ "gray",
+ "great",
+ "greedy",
+ "green",
+ "gregarious",
+ "grim",
+ "grimy",
+ "gripping",
+ "grizzled",
+ "gross",
+ "grotesque",
+ "grouchy",
+ "grounded",
+ "growing",
+ "growling",
+ "grown",
+ "grubby",
+ "gruesome",
+ "grumpy",
+ "guilty",
+ "gullible",
+ "gummy",
+ "hairy",
+ "half",
+ "handmade",
+ "handsome",
+ "handy",
+ "happy",
+ "happy-go-lucky",
+ "hard",
+ "hard-to-find",
+ "harmful",
+ "harmless",
+ "harmonious",
+ "harsh",
+ "hasty",
+ "hateful",
+ "haunting",
+ "healthy",
+ "heartfelt",
+ "hearty",
+ "heavenly",
+ "heavy",
+ "hefty",
+ "helpful",
+ "helpless",
+ "hidden",
+ "hideous",
+ "high",
+ "high-level",
+ "hilarious",
+ "hoarse",
+ "hollow",
+ "homely",
+ "honest",
+ "honorable",
+ "honored",
+ "hopeful",
+ "horrible",
+ "hospitable",
+ "hot",
+ "huge",
+ "humble",
+ "humiliating",
+ "humming",
+ "humongous",
+ "hungry",
+ "hurtful",
+ "husky",
+ "icky",
+ "icy",
+ "ideal",
+ "idealistic",
+ "identical",
+ "idle",
+ "idiotic",
+ "idolized",
+ "ignorant",
+ "ill",
+ "illegal",
+ "ill-fated",
+ "ill-informed",
+ "illiterate",
+ "illustrious",
+ "imaginary",
+ "imaginative",
+ "immaculate",
+ "immaterial",
+ "immediate",
+ "immense",
+ "impassioned",
+ "impeccable",
+ "impartial",
+ "imperfect",
+ "imperturbable",
+ "impish",
+ "impolite",
+ "important",
+ "impossible",
+ "impractical",
+ "impressionable",
+ "impressive",
+ "improbable",
+ "impure",
+ "inborn",
+ "incomparable",
+ "incompatible",
+ "incomplete",
+ "inconsequential",
+ "incredible",
+ "indelible",
+ "inexperienced",
+ "indolent",
+ "infamous",
+ "infantile",
+ "infatuated",
+ "inferior",
+ "infinite",
+ "informal",
+ "innocent",
+ "insecure",
+ "insidious",
+ "insignificant",
+ "insistent",
+ "instructive",
+ "insubstantial",
+ "intelligent",
+ "intent",
+ "intentional",
+ "interesting",
+ "internal",
+ "international",
+ "intrepid",
+ "ironclad",
+ "irresponsible",
+ "irritating",
+ "itchy",
+ "jaded",
+ "jagged",
+ "jam-packed",
+ "jaunty",
+ "jealous",
+ "jittery",
+ "joint",
+ "jolly",
+ "jovial",
+ "joyful",
+ "joyous",
+ "jubilant",
+ "judicious",
+ "juicy",
+ "jumbo",
+ "junior",
+ "jumpy",
+ "juvenile",
+ "kaleidoscopic",
+ "keen",
+ "key",
+ "kind",
+ "kindhearted",
+ "kindly",
+ "klutzy",
+ "knobby",
+ "knotty",
+ "knowledgeable",
+ "knowing",
+ "known",
+ "kooky",
+ "kosher",
+ "lame",
+ "lanky",
+ "large",
+ "last",
+ "lasting",
+ "late",
+ "lavish",
+ "lawful",
+ "lazy",
+ "leading",
+ "lean",
+ "leafy",
+ "left",
+ "legal",
+ "legitimate",
+ "light",
+ "lighthearted",
+ "likable",
+ "likely",
+ "limited",
+ "limp",
+ "limping",
+ "linear",
+ "lined",
+ "liquid",
+ "little",
+ "live",
+ "lively",
+ "livid",
+ "loathsome",
+ "lone",
+ "lonely",
+ "long",
+ "long-term",
+ "loose",
+ "lopsided",
+ "lost",
+ "loud",
+ "lovable",
+ "lovely",
+ "loving",
+ "low",
+ "loyal",
+ "lucky",
+ "lumbering",
+ "luminous",
+ "lumpy",
+ "lustrous",
+ "luxurious",
+ "mad",
+ "made-up",
+ "magnificent",
+ "majestic",
+ "major",
+ "male",
+ "mammoth",
+ "married",
+ "marvelous",
+ "masculine",
+ "massive",
+ "mature",
+ "meager",
+ "mealy",
+ "mean",
+ "measly",
+ "meaty",
+ "medical",
+ "mediocre",
+ "medium",
+ "meek",
+ "mellow",
+ "melodic",
+ "memorable",
+ "menacing",
+ "merry",
+ "messy",
+ "metallic",
+ "mild",
+ "milky",
+ "mindless",
+ "miniature",
+ "minor",
+ "minty",
+ "miserable",
+ "miserly",
+ "misguided",
+ "misty",
+ "mixed",
+ "modern",
+ "modest",
+ "moist",
+ "monstrous",
+ "monthly",
+ "monumental",
+ "moral",
+ "mortified",
+ "motherly",
+ "motionless",
+ "mountainous",
+ "muddy",
+ "muffled",
+ "multicolored",
+ "mundane",
+ "murky",
+ "mushy",
+ "musty",
+ "muted",
+ "mysterious",
+ "naive",
+ "narrow",
+ "nasty",
+ "natural",
+ "naughty",
+ "nautical",
+ "near",
+ "neat",
+ "necessary",
+ "needy",
+ "negative",
+ "neglected",
+ "negligible",
+ "neighboring",
+ "nervous",
+ "new",
+ "next",
+ "nice",
+ "nifty",
+ "nimble",
+ "nippy",
+ "nocturnal",
+ "noisy",
+ "nonstop",
+ "normal",
+ "notable",
+ "noted",
+ "noteworthy",
+ "novel",
+ "noxious",
+ "numb",
+ "nutritious",
+ "nutty",
+ "obedient",
+ "obese",
+ "oblong",
+ "oily",
+ "oblong",
+ "obvious",
+ "occasional",
+ "odd",
+ "oddball",
+ "offbeat",
+ "offensive",
+ "official",
+ "old",
+ "old-fashioned",
+ "only",
+ "open",
+ "optimal",
+ "optimistic",
+ "opulent",
+ "orange",
+ "orderly",
+ "organic",
+ "ornate",
+ "ornery",
+ "ordinary",
+ "original",
+ "other",
+ "our",
+ "outlying",
+ "outgoing",
+ "outlandish",
+ "outrageous",
+ "outstanding",
+ "oval",
+ "overcooked",
+ "overdue",
+ "overjoyed",
+ "overlooked",
+ "palatable",
+ "pale",
+ "paltry",
+ "parallel",
+ "parched",
+ "partial",
+ "passionate",
+ "past",
+ "pastel",
+ "peaceful",
+ "peppery",
+ "perfect",
+ "perfumed",
+ "periodic",
+ "perky",
+ "personal",
+ "pertinent",
+ "pesky",
+ "pessimistic",
+ "petty",
+ "phony",
+ "physical",
+ "piercing",
+ "pink",
+ "pitiful",
+ "plain",
+ "plaintive",
+ "plastic",
+ "playful",
+ "pleasant",
+ "pleased",
+ "pleasing",
+ "plump",
+ "plush",
+ "polished",
+ "polite",
+ "political",
+ "pointed",
+ "pointless",
+ "poised",
+ "poor",
+ "popular",
+ "portly",
+ "posh",
+ "positive",
+ "possible",
+ "potable",
+ "powerful",
+ "powerless",
+ "practical",
+ "precious",
+ "present",
+ "prestigious",
+ "pretty",
+ "precious",
+ "previous",
+ "pricey",
+ "prickly",
+ "primary",
+ "prime",
+ "pristine",
+ "private",
+ "prize",
+ "probable",
+ "productive",
+ "profitable",
+ "profuse",
+ "proper",
+ "proud",
+ "prudent",
+ "punctual",
+ "pungent",
+ "puny",
+ "pure",
+ "purple",
+ "pushy",
+ "putrid",
+ "puzzled",
+ "puzzling",
+ "quaint",
+ "qualified",
+ "quarrelsome",
+ "quarterly",
+ "queasy",
+ "querulous",
+ "questionable",
+ "quick",
+ "quick-witted",
+ "quiet",
+ "quintessential",
+ "quirky",
+ "quixotic",
+ "quizzical",
+ "radiant",
+ "ragged",
+ "rapid",
+ "rare",
+ "rash",
+ "raw",
+ "recent",
+ "reckless",
+ "rectangular",
+ "ready",
+ "real",
+ "realistic",
+ "reasonable",
+ "red",
+ "reflecting",
+ "regal",
+ "regular",
+ "reliable",
+ "relieved",
+ "remarkable",
+ "remorseful",
+ "remote",
+ "repentant",
+ "required",
+ "respectful",
+ "responsible",
+ "repulsive",
+ "revolving",
+ "rewarding",
+ "rich",
+ "rigid",
+ "right",
+ "ringed",
+ "ripe",
+ "roasted",
+ "robust",
+ "rosy",
+ "rotating",
+ "rotten",
+ "rough",
+ "round",
+ "rowdy",
+ "royal",
+ "rubbery",
+ "rundown",
+ "ruddy",
+ "rude",
+ "runny",
+ "rural",
+ "rusty",
+ "sad",
+ "safe",
+ "salty",
+ "same",
+ "sandy",
+ "sane",
+ "sarcastic",
+ "sardonic",
+ "satisfied",
+ "scaly",
+ "scarce",
+ "scared",
+ "scary",
+ "scented",
+ "scholarly",
+ "scientific",
+ "scornful",
+ "scratchy",
+ "scrawny",
+ "second",
+ "secondary",
+ "second-hand",
+ "secret",
+ "self-assured",
+ "self-reliant",
+ "selfish",
+ "sentimental",
+ "separate",
+ "serene",
+ "serious",
+ "serpentine",
+ "several",
+ "severe",
+ "shabby",
+ "shadowy",
+ "shady",
+ "shallow",
+ "shameful",
+ "shameless",
+ "sharp",
+ "shimmering",
+ "shiny",
+ "shocked",
+ "shocking",
+ "shoddy",
+ "short",
+ "short-term",
+ "showy",
+ "shrill",
+ "shy",
+ "sick",
+ "silent",
+ "silky",
+ "silly",
+ "silver",
+ "similar",
+ "simple",
+ "simplistic",
+ "sinful",
+ "single",
+ "sizzling",
+ "skeletal",
+ "skinny",
+ "sleepy",
+ "slight",
+ "slim",
+ "slimy",
+ "slippery",
+ "slow",
+ "slushy",
+ "small",
+ "smart",
+ "smoggy",
+ "smooth",
+ "smug",
+ "snappy",
+ "snarling",
+ "sneaky",
+ "sniveling",
+ "snoopy",
+ "sociable",
+ "soft",
+ "soggy",
+ "solid",
+ "somber",
+ "some",
+ "spherical",
+ "sophisticated",
+ "sore",
+ "sorrowful",
+ "soulful",
+ "soupy",
+ "sour",
+ "Spanish",
+ "sparkling",
+ "sparse",
+ "specific",
+ "spectacular",
+ "speedy",
+ "spicy",
+ "spiffy",
+ "spirited",
+ "spiteful",
+ "splendid",
+ "spotless",
+ "spotted",
+ "spry",
+ "square",
+ "squeaky",
+ "squiggly",
+ "stable",
+ "staid",
+ "stained",
+ "stale",
+ "standard",
+ "starchy",
+ "stark",
+ "starry",
+ "steep",
+ "sticky",
+ "stiff",
+ "stimulating",
+ "stingy",
+ "stormy",
+ "straight",
+ "strange",
+ "steel",
+ "strict",
+ "strident",
+ "striking",
+ "striped",
+ "strong",
+ "studious",
+ "stunning",
+ "stupendous",
+ "stupid",
+ "sturdy",
+ "stylish",
+ "subdued",
+ "submissive",
+ "substantial",
+ "subtle",
+ "suburban",
+ "sudden",
+ "sugary",
+ "sunny",
+ "super",
+ "superb",
+ "superficial",
+ "superior",
+ "supportive",
+ "sure-footed",
+ "surprised",
+ "suspicious",
+ "svelte",
+ "sweaty",
+ "sweet",
+ "sweltering",
+ "swift",
+ "sympathetic",
+ "tall",
+ "talkative",
+ "tame",
+ "tan",
+ "tangible",
+ "tart",
+ "tasty",
+ "tattered",
+ "taut",
+ "tedious",
+ "teeming",
+ "tempting",
+ "tender",
+ "tense",
+ "tepid",
+ "terrible",
+ "terrific",
+ "testy",
+ "thankful",
+ "that",
+ "these",
+ "thick",
+ "thin",
+ "third",
+ "thirsty",
+ "this",
+ "thorough",
+ "thorny",
+ "those",
+ "thoughtful",
+ "threadbare",
+ "thrifty",
+ "thunderous",
+ "tidy",
+ "tight",
+ "timely",
+ "tinted",
+ "tiny",
+ "tired",
+ "torn",
+ "total",
+ "tough",
+ "traumatic",
+ "treasured",
+ "tremendous",
+ "tragic",
+ "trained",
+ "tremendous",
+ "triangular",
+ "tricky",
+ "trifling",
+ "trim",
+ "trivial",
+ "troubled",
+ "true",
+ "trusting",
+ "trustworthy",
+ "trusty",
+ "truthful",
+ "tubby",
+ "turbulent",
+ "twin",
+ "ugly",
+ "ultimate",
+ "unacceptable",
+ "unaware",
+ "uncomfortable",
+ "uncommon",
+ "unconscious",
+ "understated",
+ "unequaled",
+ "uneven",
+ "unfinished",
+ "unfit",
+ "unfolded",
+ "unfortunate",
+ "unhappy",
+ "unhealthy",
+ "uniform",
+ "unimportant",
+ "unique",
+ "united",
+ "unkempt",
+ "unknown",
+ "unlawful",
+ "unlined",
+ "unlucky",
+ "unnatural",
+ "unpleasant",
+ "unrealistic",
+ "unripe",
+ "unruly",
+ "unselfish",
+ "unsightly",
+ "unsteady",
+ "unsung",
+ "untidy",
+ "untimely",
+ "untried",
+ "untrue",
+ "unused",
+ "unusual",
+ "unwelcome",
+ "unwieldy",
+ "unwilling",
+ "unwitting",
+ "unwritten",
+ "upbeat",
+ "upright",
+ "upset",
+ "urban",
+ "usable",
+ "used",
+ "useful",
+ "useless",
+ "utilized",
+ "utter",
+ "vacant",
+ "vague",
+ "vain",
+ "valid",
+ "valuable",
+ "vapid",
+ "variable",
+ "vast",
+ "velvety",
+ "venerated",
+ "vengeful",
+ "verifiable",
+ "vibrant",
+ "vicious",
+ "victorious",
+ "vigilant",
+ "vigorous",
+ "villainous",
+ "violet",
+ "violent",
+ "virtual",
+ "virtuous",
+ "visible",
+ "vital",
+ "vivacious",
+ "vivid",
+ "voluminous",
+ "wan",
+ "warlike",
+ "warm",
+ "warmhearted",
+ "warped",
+ "wary",
+ "wasteful",
+ "watchful",
+ "waterlogged",
+ "watery",
+ "wavy",
+ "wealthy",
+ "weak",
+ "weary",
+ "webbed",
+ "wee",
+ "weekly",
+ "weepy",
+ "weighty",
+ "weird",
+ "welcome",
+ "well-documented",
+ "well-groomed",
+ "well-informed",
+ "well-lit",
+ "well-made",
+ "well-off",
+ "well-to-do",
+ "well-worn",
+ "wet",
+ "which",
+ "whimsical",
+ "whirlwind",
+ "whispered",
+ "white",
+ "whole",
+ "whopping",
+ "wicked",
+ "wide",
+ "wide-eyed",
+ "wiggly",
+ "wild",
+ "willing",
+ "wilted",
+ "winding",
+ "windy",
+ "winged",
+ "wiry",
+ "wise",
+ "witty",
+ "wobbly",
+ "woeful",
+ "wonderful",
+ "wooden",
+ "woozy",
+ "wordy",
+ "worldly",
+ "worn",
+ "worried",
+ "worrisome",
+ "worse",
+ "worst",
+ "worthless",
+ "worthwhile",
+ "worthy",
+ "wrathful",
+ "wretched",
+ "writhing",
+ "wrong",
+ "wry",
+ "yawning",
+ "yearly",
+ "yellow",
+ "yellowish",
+ "young",
+ "youthful",
+ "yummy",
+ "zany",
+ "zealous",
+ "zesty",
+ "zigzag",
+ ]
+)
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 7bc00f2..3878975 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -397,7 +397,7 @@ impl PyDataFrameStage {
/// returns the stage ids of that we need to read from in order to execute
#[getter]
- pub fn input_stage_ids(&self) -> PyResult<Vec<usize>> {
+ pub fn child_stage_ids(&self) -> PyResult<Vec<usize>> {
let mut result = vec![];
self.plan
.clone()
diff --git a/src/ray_stage_reader.rs b/src/ray_stage_reader.rs
index 093ed4e..317fdcd 100644
--- a/src/ray_stage_reader.rs
+++ b/src/ray_stage_reader.rs
@@ -111,8 +111,11 @@ impl ExecutionPlan for RayStageReaderExec {
let clients = client_map
.get(&(self.stage_id, partition))
.ok_or(internal_datafusion_err!(
- "No flight clients found for {}",
- self.stage_id
+ "{} No flight clients found for {}:{}, have {:?}",
+ name,
+ self.stage_id,
+ partition,
+ client_map.keys()
))?
.lock()
.iter()
@@ -138,17 +141,20 @@ impl ExecutionPlan for RayStageReaderExec {
let mut streams = vec![];
for mut client in clients {
+ let name = name.clone();
+ trace!("{name} Getting flight stream" );
match client.do_get(ticket.clone()).await {
Ok(flight_stream) => {
+ trace!("{name} Got flight stream. headers:{:?}",
flight_stream.headers());
let rbr_stream =
RecordBatchStreamAdapter::new(schema.clone(),
flight_stream
- .map_err(|e| internal_datafusion_err!("Error
consuming flight stream: {}", e)));
+ .map_err(move |e| internal_datafusion_err!("{}
Error consuming flight stream: {}", name, e)));
streams.push(Box::pin(rbr_stream) as
SendableRecordBatchStream);
},
Err(e) => {
error = true;
- yield internal_err!("Error getting flight stream: {}",
e);
+ yield internal_err!("{} Error getting flight stream:
{}", name, e);
}
}
}
diff --git a/src/stage_service.rs b/src/stage_service.rs
index 040906d..0307a7b 100644
--- a/src/stage_service.rs
+++ b/src/stage_service.rs
@@ -20,15 +20,18 @@ use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
+use arrow::array::RecordBatch;
+use arrow::datatypes::Schema;
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::error::FlightError;
use arrow_flight::FlightClient;
use datafusion::common::internal_datafusion_err;
use datafusion::execution::SessionStateBuilder;
+use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_python::utils::wait_for_future;
-use futures::TryStreamExt;
+use futures::{Stream, TryStreamExt};
use local_ip_address::local_ip;
use log::{debug, error, info, trace};
use tokio::net::TcpListener;
@@ -42,11 +45,11 @@ use
arrow_flight::{flight_service_server::FlightServiceServer, Ticket};
use pyo3::prelude::*;
-use parking_lot::Mutex;
+use parking_lot::{Mutex, RwLock};
use tokio::sync::mpsc::{channel, Receiver, Sender};
-use crate::flight::{FlightHandler, FlightServ};
+use crate::flight::{DoGetStream, FlightHandler, FlightServ};
use crate::isolator::PartitionGroup;
use crate::util::{
bytes_to_physical_plan, display_plan_with_partition_counts,
extract_ticket, fix_plan,
@@ -61,45 +64,67 @@ pub(crate) struct ServiceClients(pub HashMap<(usize,
usize), Mutex<Vec<FlightCli
/// StageHandler is a [`FlightHandler`] that serves streams of partitions from
a hosted Physical Plan
/// It only responds to the DoGet Arrow Flight method.
struct StageHandler {
+ /// our name, useful for logging
+ name: String,
+ /// Inner state of the handler
+ inner: RwLock<Option<StageHandlerInner>>,
+}
+
+struct StageHandlerInner {
/// our stage id that we are hosting
pub(crate) stage_id: usize,
/// the physical plan that comprises our stage
- plan: Arc<dyn ExecutionPlan>,
+ pub(crate) plan: Arc<dyn ExecutionPlan>,
/// the session context we will use to execute the plan
- ctx: Mutex<Option<SessionContext>>,
- /// The partitions we will be hosting from this plan.
- partition_group: Vec<usize>,
+ pub(crate) ctx: SessionContext,
}
impl StageHandler {
+ pub fn new(name: String) -> Self {
+ let inner = RwLock::new(None);
+
+ Self { name, inner }
+ }
+ async fn update_plan(
+ &self,
+ stage_id: usize,
+ stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
+ plan: Arc<dyn ExecutionPlan>,
+ partition_group: Vec<usize>,
+ ) -> DFResult<()> {
+ let inner = StageHandlerInner::new(stage_id, stage_addrs, plan,
partition_group).await?;
+ self.inner.write().replace(inner);
+ Ok(())
+ }
+
+ fn stage_id(&self) -> Option<usize> {
+ self.inner.read().as_ref().map(|i| i.stage_id)
+ }
+}
+
+impl StageHandlerInner {
pub async fn new(
stage_id: usize,
- plan_bytes: &[u8],
+ stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
+ plan: Arc<dyn ExecutionPlan>,
partition_group: Vec<usize>,
) -> DFResult<Self> {
- let plan = bytes_to_physical_plan(&SessionContext::new(), plan_bytes)?;
- let plan = fix_plan(plan)?;
- debug!(
- "StageHandler::new [Stage:{}], plan:\n{}",
- stage_id,
- display_plan_with_partition_counts(&plan)
- );
-
- let ctx = Mutex::new(None);
+ let ctx = Self::configure_ctx(stage_id, stage_addrs, &plan,
partition_group).await?;
Ok(Self {
stage_id,
plan,
ctx,
- partition_group,
})
}
async fn configure_ctx(
- &self,
+ stage_id: usize,
stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
- ) -> DFResult<()> {
- let stage_ids_i_need = input_stage_ids(&self.plan)?;
+ plan: &Arc<dyn ExecutionPlan>,
+ partition_group: Vec<usize>,
+ ) -> DFResult<SessionContext> {
+ let stage_ids_i_need = input_stage_ids(&plan)?;
// map of stage_id, partition -> Vec<FlightClient>
let mut client_map = HashMap::new();
@@ -141,7 +166,7 @@ impl StageHandler {
// this only matters if the plan includes an PartitionIsolatorExec,
which looks for this
// for this extension and will be ignored otherwise
- config =
config.with_extension(Arc::new(PartitionGroup(self.partition_group.clone())));
+ config =
config.with_extension(Arc::new(PartitionGroup(partition_group.clone())));
let state = SessionStateBuilder::new()
.with_default_features()
@@ -149,12 +174,33 @@ impl StageHandler {
.build();
let ctx = SessionContext::new_with_state(state);
- self.ctx.lock().replace(ctx);
- trace!("ctx configured for stage {}", self.stage_id);
- Ok(())
+ trace!("ctx configured for stage {}", stage_id);
+
+ Ok(ctx)
}
}
+fn make_stream(
+ inner: &StageHandlerInner,
+ partition: usize,
+) -> Result<impl Stream<Item = Result<RecordBatch, FlightError>> + Send +
'static, Status> {
+ let task_ctx = inner.ctx.task_ctx();
+
+ let stream = inner
+ .plan
+ .execute(partition, task_ctx)
+ .inspect_err(|e| {
+ error!(
+ "{}",
+ format!("Could not get partition stream from plan {e}")
+ )
+ })
+ .map_err(|e| Status::internal(format!("Could not get partition stream
from plan {e}")))?
+ .map_err(|e| FlightError::from_external_error(Box::new(e)));
+
+ Ok(stream)
+}
+
#[async_trait]
impl FlightHandler for StageHandler {
async fn get_stream(
@@ -168,41 +214,33 @@ impl FlightHandler for StageHandler {
let ticket = request.into_inner();
- let partition = extract_ticket(ticket)
- .map_err(|e| Status::internal(format!("Unexpected error extracting
ticket {e}")))?;
+ let partition = extract_ticket(ticket).map_err(|e| {
+ Status::internal(format!(
+ "{}, Unexpected error extracting ticket {e}",
+ self.name
+ ))
+ })?;
trace!(
- "StageService[Stage:{}], request for partition {} from {}",
- self.stage_id,
+ "{}, request for partition {} from {}",
+ self.name,
partition,
remote_addr
);
- let task_ctx = self
- .ctx
- .lock()
- .as_ref()
- .ok_or(Status::internal(format!(
- "Stage [{}] get_stream cannot find ctx",
- self.stage_id
- )))?
- .task_ctx();
-
+ let name = self.name.clone();
let stream = self
- .plan
- .execute(partition, task_ctx)
- .inspect_err(|e| {
- error!(
- "{}",
- format!("Could not get partition stream from plan {e}")
- )
- })
- .map_err(|e| Status::internal(format!("Could not get partition
stream from plan {e}")))?
- .map_err(|e| FlightError::from_external_error(Box::new(e)));
+ .inner
+ .read()
+ .as_ref()
+ .map(|inner| make_stream(inner, partition))
+ .ok_or_else(|| Status::internal(format!("{} No inner found",
&name)))??;
let out_stream = FlightDataEncoderBuilder::new()
.build(stream)
- .map_err(|e| Status::internal(format!("Unexpected error building
stream {e}")));
+ .map_err(move |e| {
+ Status::internal(format!("{} Unexpected error building stream
{e}", name))
+ });
Ok(Response::new(Box::pin(out_stream)))
}
@@ -225,22 +263,15 @@ pub struct StageService {
#[pymethods]
impl StageService {
#[new]
- pub fn new(
- py: Python,
- stage_id: usize,
- plan_bytes: &[u8],
- partition_group: Vec<usize>,
- ) -> PyResult<Self> {
+ pub fn new(name: String) -> PyResult<Self> {
+ let name = format!("[{}]", name);
let listener = None;
let addr = None;
let (all_done_tx, all_done_rx) = channel(1);
let all_done_tx = Arc::new(Mutex::new(all_done_tx));
- let name = format!("StageService[{}]", stage_id);
- let fut = StageHandler::new(stage_id, plan_bytes, partition_group);
-
- let handler = Arc::new(wait_for_future(py, fut).to_py_err()?);
+ let handler = Arc::new(StageHandler::new(name.clone()));
Ok(Self {
name,
@@ -272,33 +303,65 @@ impl StageService {
/// get the address of the listing socket for this service
pub fn addr(&self) -> PyResult<String> {
- self.addr
- .clone()
- .ok_or_else(|| PyErr::new::<pyo3::exceptions::PyException,
_>("Couldn't get addr"))
+ self.addr.clone().ok_or_else(|| {
+ PyErr::new::<pyo3::exceptions::PyException, _>(format!(
+ "{},Couldn't get addr",
+ self.name
+ ))
+ })
}
- pub fn set_stage_addrs<'a>(
- &mut self,
- py: Python<'a>,
- stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
- ) -> PyResult<Bound<'a, PyAny>> {
- let handler = self.handler.clone();
+ /// signal to the service that we can shutdown
+ ///
+ /// returns a python coroutine that should be awaited
+ pub fn all_done<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
+ let sender = self.all_done_tx.lock().clone();
+
let fut = async move {
- handler.configure_ctx(stage_addrs).await.to_py_err()?;
+ sender.send(()).await.to_py_err()?;
Ok(())
};
pyo3_async_runtimes::tokio::future_into_py(py, fut)
}
- /// signal to the service that we can shutdown
+ /// replace the plan that this service was providing, we will do this when
we want
+ /// to reuse the StageService for a subsequent query
+ ///
/// returns a python coroutine that should be awaited
- pub fn all_done<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
- let sender = self.all_done_tx.lock().clone();
+ pub fn update_plan<'a>(
+ &self,
+ py: Python<'a>,
+ stage_id: usize,
+ stage_addrs: HashMap<usize, HashMap<usize, Vec<String>>>,
+ partition_group: Vec<usize>,
+ plan_bytes: &[u8],
+ ) -> PyResult<Bound<'a, PyAny>> {
+ let plan = bytes_to_physical_plan(&SessionContext::new(), plan_bytes)?;
+ debug!(
+ "{} Received New Plan: Stage:{} my addr: {}, partition_group {:?},
stage_addrs:\n{:?}\nplan:\n{}",
+ self.name,
+ stage_id,
+ self.addr()?,
+ partition_group,
+ stage_addrs,
+ display_plan_with_partition_counts(&plan)
+ );
+
+ let handler = self.handler.clone();
+ let name = self.name.clone();
let fut = async move {
- sender.send(()).await.to_py_err()?;
+ handler
+ .update_plan(stage_id, stage_addrs, plan,
partition_group.clone())
+ .await
+ .to_py_err()?;
+ info!(
+ "{} [stage: {} pg:{:?}] updated plan",
+ name, stage_id, partition_group
+ );
Ok(())
};
+
pyo3_async_runtimes::tokio::future_into_py(py, fut)
}
@@ -309,7 +372,7 @@ impl StageService {
let signal = async move {
// TODO: handle Result
- let _ = all_done_rx.recv().await;
+ let result = all_done_rx.recv().await;
};
let service = FlightServ {
@@ -319,11 +382,9 @@ impl StageService {
let svc = FlightServiceServer::new(service);
let listener = self.listener.take().unwrap();
-
let name = self.name.clone();
- let stage_id = self.handler.stage_id;
+
let serv = async move {
- trace!("StageService [{}] Serving", stage_id);
Server::builder()
.add_service(svc)
.serve_with_incoming_shutdown(
@@ -331,9 +392,8 @@ impl StageService {
signal,
)
.await
- .inspect_err(|e| error!("StageService [{}] ERROR serving {e}",
name))
+ .inspect_err(|e| error!("{}, ERROR serving {e}", name))
.map_err(|e| PyErr::new::<pyo3::exceptions::PyException,
_>(format!("{e}")))?;
- info!("tageService [{}] DONE serving", name);
Ok::<(), Box<dyn Error + Send + Sync>>(())
};
diff --git a/tpch/tpc.py b/tpch/tpc.py
index 6c26bf8..8fa5ae8 100644
--- a/tpch/tpc.py
+++ b/tpch/tpc.py
@@ -14,18 +14,28 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+#
+#
+#
+# This file is useful for running a query against a TPCH dataset.
+#
+# You can run an arbitrary query by passing --query 'select...' or you can run
a
+# TPCH query by passing --qnum 1-22.
import argparse
import ray
-from datafusion import SessionContext, SessionConfig
-from datafusion_ray import RayContext, prettify, runtime_env
-from datetime import datetime
-import json
+from datafusion_ray import RayContext, runtime_env
import os
+import sys
import time
-import duckdb
-from datafusion.object_store import AmazonS3
+try:
+ import duckdb
+except ImportError:
+ print(
+ "duckdb not installed, which is used in this file for retrieving the
TPCH query"
+ )
+ sys.exit(1)
def make_ctx(
@@ -33,6 +43,7 @@ def make_ctx(
concurrency: int,
batch_size: int,
partitions_per_worker: int | None,
+ worker_pool_min: int,
listing_tables: bool,
):
@@ -51,7 +62,11 @@ def make_ctx(
# use ray job submit
ray.init(runtime_env=runtime_env)
- ctx = RayContext(batch_size=batch_size,
partitions_per_worker=partitions_per_worker)
+ ctx = RayContext(
+ batch_size=batch_size,
+ partitions_per_worker=partitions_per_worker,
+ worker_pool_min=worker_pool_min,
+ )
ctx.set("datafusion.execution.target_partitions", f"{concurrency}")
# ctx.set("datafusion.execution.parquet.pushdown_filters", "true")
@@ -75,19 +90,19 @@ def main(
batch_size: int,
query: str,
partitions_per_worker: int | None,
- validate: bool,
+ worker_pool_min: int,
listing_tables,
) -> None:
ctx = make_ctx(
- data_path, concurrency, batch_size, partitions_per_worker,
listing_tables
+ data_path,
+ concurrency,
+ batch_size,
+ partitions_per_worker,
+ worker_pool_min,
+ listing_tables,
)
df = ctx.sql(query)
- for stage in df.stages():
- print(
- f"Stage {stage.stage_id} output
partitions:{stage.num_output_partitions} partition_groups:
{stage.partition_groups}"
- )
- print(stage.execution_plan().display_indent())
-
+ time.sleep(3)
df.show()
@@ -110,7 +125,11 @@ if __name__ == "__main__":
type=int,
help="Max partitions per Stage Service Worker",
)
- parser.add_argument("--validate", action="store_true")
+ parser.add_argument(
+ "--worker-pool-min",
+ type=int,
+ help="Minimum number of RayStages to keep in pool",
+ )
parser.add_argument("--listing-tables", action="store_true")
args = parser.parse_args()
@@ -125,6 +144,6 @@ if __name__ == "__main__":
int(args.batch_size),
query,
args.partitions_per_worker,
- args.validate,
+ args.worker_pool_min,
args.listing_tables,
)
diff --git a/tpch/tpcbench.py b/tpch/tpcbench.py
index 1a93360..f8a9dbc 100644
--- a/tpch/tpcbench.py
+++ b/tpch/tpcbench.py
@@ -24,7 +24,13 @@ import json
import os
import time
-import duckdb
+try:
+ import duckdb
+except ImportError:
+ print(
+ "duckdb not installed, which is used in this file for retrieving the
TPCH query"
+ )
+ sys.exit(1)
def tpch_query(qnum: int) -> str:
@@ -38,6 +44,7 @@ def main(
concurrency: int,
batch_size: int,
partitions_per_worker: int | None,
+ worker_pool_min: int,
listing_tables: bool,
validate: bool,
prefetch_buffer_size: int,
@@ -62,6 +69,7 @@ def main(
batch_size=batch_size,
partitions_per_worker=partitions_per_worker,
prefetch_buffer_size=prefetch_buffer_size,
+ worker_pool_min=worker_pool_min,
)
ctx.set("datafusion.execution.target_partitions", f"{concurrency}")
@@ -189,6 +197,11 @@ if __name__ == "__main__":
type=int,
help="How many batches each stage should eagerly buffer",
)
+ parser.add_argument(
+ "--worker-pool-min",
+ type=int,
+ help="Minimum number of RayStages to keep in pool",
+ )
args = parser.parse_args()
@@ -198,6 +211,7 @@ if __name__ == "__main__":
int(args.concurrency),
int(args.batch_size),
args.partitions_per_worker,
+ args.worker_pool_min,
args.listing_tables,
args.validate,
args.prefetch_buffer_size,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]