This is an automated email from the ASF dual-hosted git repository. skrawcz pushed a commit to branch remove_ray_workflows in repository https://gitbox.apache.org/repos/asf/hamilton.git
commit d91a2ee1702a2433eb602151e60e86de10c12828 Author: Stefan Krawczyk <[email protected]> AuthorDate: Fri Jul 25 23:27:04 2025 -0700 Deletes RayWorkflows Ray workflows are now unsupported. So removing. --- .../graph-adapters/RayWorkflowGraphAdapter.rst | 10 --- docs/reference/graph-adapters/index.rst | 1 - examples/ray/hello_world/README.md | 6 -- examples/ray/hello_world/run_rayworkflow.py | 42 ---------- hamilton/experimental/h_ray.py | 4 +- hamilton/plugins/h_ray.py | 97 ---------------------- plugin_tests/h_ray/test_h_ray_workflow.py | 75 ----------------- 7 files changed, 2 insertions(+), 233 deletions(-) diff --git a/docs/reference/graph-adapters/RayWorkflowGraphAdapter.rst b/docs/reference/graph-adapters/RayWorkflowGraphAdapter.rst deleted file mode 100644 index b302e2f6..00000000 --- a/docs/reference/graph-adapters/RayWorkflowGraphAdapter.rst +++ /dev/null @@ -1,10 +0,0 @@ -============================= -h_ray.RayWorkflowGraphAdapter -============================= -A Graph Adapter for delegating the execution of hamilton nodes to Ray. - - -.. autoclass:: hamilton.plugins.h_ray.RayWorkflowGraphAdapter - :special-members: __init__ - :members: - :inherited-members: diff --git a/docs/reference/graph-adapters/index.rst b/docs/reference/graph-adapters/index.rst index 08c94575..37f84a87 100644 --- a/docs/reference/graph-adapters/index.rst +++ b/docs/reference/graph-adapters/index.rst @@ -20,5 +20,4 @@ Reference DaskGraphAdapter PySparkUDFGraphAdapter RayGraphAdapter - RayWorkflowGraphAdapter SparkKoalasGraphAdapter diff --git a/examples/ray/hello_world/README.md b/examples/ray/hello_world/README.md index f2c40d10..e40c3262 100644 --- a/examples/ray/hello_world/README.md +++ b/examples/ray/hello_world/README.md @@ -12,8 +12,6 @@ File organization: * `data_loaders.py` houses logic to load data for the business_logic.py module. The idea is that you'd swap this module out for other ways of loading data. * `run.py` is the script that ties everything together that uses vanilla Ray. -* `run_rayworkflow.py` is the script that again ties everything together, but this time uses -[Ray Workflows](https://docs.ray.io/en/latest/workflows/concepts.html) to execute. # Running the code: For the vanilla Ray implementation use: @@ -23,7 +21,3 @@ For the vanilla Ray implementation use: Here is the visualization of the execution:  - -For the [Ray Workflow](https://docs.ray.io/en/latest/workflows/concepts.html) implementation use: - -> python run_rayworkflow.py diff --git a/examples/ray/hello_world/run_rayworkflow.py b/examples/ray/hello_world/run_rayworkflow.py deleted file mode 100644 index 40c33c5e..00000000 --- a/examples/ray/hello_world/run_rayworkflow.py +++ /dev/null @@ -1,42 +0,0 @@ -import ray -from ray import workflow - -from hamilton import base, driver, log_setup -from hamilton.plugins import h_ray - -if __name__ == "__main__": - log_setup.setup_logging() - workflow.init() - # You can also script module import loading by knowing the module name - # See run.py for an example of doing it that way. - import business_logic - import data_loaders - - modules = [data_loaders, business_logic] - initial_columns = { # could load data here via some other means, or delegate to a module as we have done. - # 'signups': pd.Series([1, 10, 50, 100, 200, 400]), - "signups_location": "some_path", - # 'spend': pd.Series([10, 10, 20, 40, 40, 50]), - "spend_location": "some_other_path", - } - rga = h_ray.RayWorkflowGraphAdapter( - result_builder=base.PandasDataFrameResult(), - # Ray will resume a run if possible based on workflow id - workflow_id="hello-world-123", - ) - dr = driver.Driver(initial_columns, *modules, adapter=rga) - # we need to specify what we want in the final dataframe. - output_columns = [ - "spend", - "signups", - "avg_3wk_spend", - "spend_per_signup", - "spend_zero_mean_unit_variance", - ] - # let's create the dataframe! - df = dr.execute(output_columns) - # To visualize do `pip install "sf-hamilton[visualization]"` if you want these to work - # dr.visualize_execution(output_columns, './my_dag.dot', {}) - # dr.display_all_functions('./my_full_dag.dot') - print(df.to_string()) - ray.shutdown() diff --git a/hamilton/experimental/h_ray.py b/hamilton/experimental/h_ray.py index bf870b89..cfc36bf7 100644 --- a/hamilton/experimental/h_ray.py +++ b/hamilton/experimental/h_ray.py @@ -1,6 +1,6 @@ import logging -from hamilton.plugins.h_ray import RayGraphAdapter, RayTaskExecutor, RayWorkflowGraphAdapter +from hamilton.plugins.h_ray import RayGraphAdapter, RayTaskExecutor logger = logging.getLogger(__name__) logger.warning( @@ -8,4 +8,4 @@ logger.warning( " Please use hamilton.plugins.h_ray instead." ) -__all__ = ["RayGraphAdapter", "RayWorkflowGraphAdapter", "RayTaskExecutor"] +__all__ = ["RayGraphAdapter", "RayTaskExecutor"] diff --git a/hamilton/plugins/h_ray.py b/hamilton/plugins/h_ray.py index 66d0a925..0b36d65a 100644 --- a/hamilton/plugins/h_ray.py +++ b/hamilton/plugins/h_ray.py @@ -6,7 +6,6 @@ import time import typing import ray -from ray import workflow from hamilton import base, htypes, lifecycle, node from hamilton.execution import executors @@ -172,102 +171,6 @@ class RayGraphAdapter( ray.shutdown() -class RayWorkflowGraphAdapter(base.HamiltonGraphAdapter, base.ResultMixin): - """Class representing what's required to make Hamilton run Ray Workflows - - Use `pip install sf-hamilton[ray]` to get the dependencies required to run this. - - Ray workflows is a more robust way to scale computation for any type of Hamilton graph. - - What's the difference between this and RayGraphAdapter? - -------------------------------------------------------- - * Ray workflows offer durable computation. That is, they save and checkpoint each function. - * This enables one to run a workflow, and not have to restart it if something fails, assuming correct\ - Ray workflow usage. - - Tips - ---- - See https://docs.ray.io/en/latest/workflows/basics.html for the source of the following: - - 1. Functions should be idempotent. - 2. The workflow ID is what Ray uses to try to resume/restart if run a second time. - 3. Nothing is run until the entire DAG is walked and setup and build_result is called. - - Notes on scaling: - ----------------- - - Multi-core on single machine ✅ - - Distributed computation on a Ray cluster ✅ - - Scales to any size of data ⛔️; you are LIMITED by the memory on the instance/computer 💻. - - Function return object types supported: - --------------------------------------- - - Works for any python object that can be serialized by the Ray framework. ✅ - - Pandas? - -------- - - ⛔️ Ray DOES NOT do anything special about Pandas. - - CAVEATS - ------- - - Serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it's\ - worth it. - - DISCLAIMER -- this class is experimental, so signature changes are a possibility! - """ - - def __init__(self, result_builder: base.ResultMixin, workflow_id: str): - """Constructor - - :param result_builder: Required. An implementation of base.ResultMixin. - :param workflow_id: Required. An ID to give the ray workflow to identify it for durability purposes. - :param max_retries: Optional. The function will be retried for the given number of times if an - exception is raised. - """ - self.result_builder = result_builder - self.workflow_id = workflow_id - if not self.result_builder: - raise ValueError( - "Error: ResultMixin object required. Please pass one in for `result_builder`." - ) - - @staticmethod - def check_input_type(node_type: typing.Type, input_value: typing.Any) -> bool: - # NOTE: the type of a raylet is unknown until they are computed - if isinstance(input_value, ray._raylet.ObjectRef): - return True - return htypes.check_input_type(node_type, input_value) - - @staticmethod - def check_node_type_equivalence(node_type: typing.Type, input_type: typing.Type) -> bool: - return node_type == input_type - - def execute_node(self, node: node.Node, kwargs: typing.Dict[str, typing.Any]) -> typing.Any: - """Function that is called as we walk the graph to determine how to execute a hamilton function. - - :param node: the node from the graph. - :param kwargs: the arguments that should be passed to it. - :return: returns a ray object reference. - """ - ray_options = parse_ray_remote_options_from_tags(node.tags) - return ray.remote(raify(node.callable)).options(**ray_options).bind(**kwargs) - - def build_result(self, **outputs: typing.Dict[str, typing.Any]) -> typing.Any: - """Builds the result and brings it back to this running process. - - :param outputs: the dictionary of key -> Union[ray object reference | value] - :return: The type of object returned by self.result_builder. - """ - if logger.isEnabledFor(logging.DEBUG): - for k, v in outputs.items(): - logger.debug(f"Got output {k}, with type [{type(v)}].") - # need to wrap our result builder in a remote call and then pass in what we want to build from. - remote_combine = ray.remote(self.result_builder.build_result).bind(**outputs) - result = workflow.run( - remote_combine, workflow_id=self.workflow_id - ) # this materializes the object locally - return result - - class RayTaskExecutor(executors.TaskExecutor): """Task executor using Ray for the new task-based execution mechanism in Hamilton. This is still experimental, so the API might change. diff --git a/plugin_tests/h_ray/test_h_ray_workflow.py b/plugin_tests/h_ray/test_h_ray_workflow.py deleted file mode 100644 index 050ebcc7..00000000 --- a/plugin_tests/h_ray/test_h_ray_workflow.py +++ /dev/null @@ -1,75 +0,0 @@ -import pandas as pd -import pytest -import ray -from ray import workflow - -from hamilton import base, driver -from hamilton.plugins import h_ray -from plugin_tests.h_ray.resources import example_module, smoke_screen_module - - [email protected](scope="module") -def init(): - # Do not need to call ray.init() when using a workflow now it seems? - yield "initialized" - ray.shutdown() - - -# This does not work locally -- will ask Ray slack for support. -def test_ray_workflow_graph_adapter(init): - workflow.init() - initial_columns = { - "signups": pd.Series([1, 10, 50, 100, 200, 400]), - "spend": pd.Series([10, 10, 20, 40, 40, 50]), - } - dr = driver.Driver( - initial_columns, - example_module, - adapter=h_ray.RayWorkflowGraphAdapter( - base.PandasDataFrameResult(), "test-test_ray_workflow_graph_adapter" - ), - ) - output_columns = [ - "spend", - "signups", - "avg_3wk_spend", - "spend_per_signup", - ] - df = dr.execute(output_columns) - assert set(df) == set(output_columns) - expected_column = pd.Series( - [0.0, 0.0, 13.33333, 23.33333, 33.33333, 43.33333], name="avg_3wk_spend" - ) - pd.testing.assert_series_equal( - df.avg_3wk_spend.fillna(0.0), expected_column - ) # fill na to get around NaN - # TODO: do some more asserting? - - -def test_smoke_screen_module(init): - workflow.init() - config = {"region": "US"} - dr = driver.Driver( - config, - smoke_screen_module, - adapter=h_ray.RayWorkflowGraphAdapter( - base.PandasDataFrameResult(), "test-test_smoke_screen_module" - ), - ) - output_columns = [ - "raw_acquisition_cost", - "pessimistic_net_acquisition_cost", - "neutral_net_acquisition_cost", - "optimistic_net_acquisition_cost", - "series_with_start_date_end_date", - ] - df = dr.execute( - inputs={"date_range": {"start_date": "20200101", "end_date": "20220801"}}, - final_vars=output_columns, - ) - epsilon = 0.00001 - assert abs(df["raw_acquisition_cost"].mean() - 0.393808) < epsilon - assert abs(df["pessimistic_net_acquisition_cost"].mean() - 0.420769) < epsilon - assert abs(df["neutral_net_acquisition_cost"].mean() - 0.405582) < epsilon - assert abs(df["optimistic_net_acquisition_cost"].mean() - 0.399363) < epsilon - assert df["series_with_start_date_end_date"].iloc[0] == "date_20200101_date_20220801"
