This is an automated email from the ASF dual-hosted git repository.
masahi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tvm.git
The following commit(s) were added to refs/heads/main by this push:
new e02bf824d1 [Runtime][PipelineExecutor] Add graph manually splitting
logic into the unit test. (#11334)
e02bf824d1 is described below
commit e02bf824d11019413ed1f8eb78da2b3427b0f026
Author: Hua Jiang <[email protected]>
AuthorDate: Thu May 19 16:51:13 2022 -0700
[Runtime][PipelineExecutor] Add graph manually splitting logic into the
unit test. (#11334)
* [Runtime][PipelineExecutor] Add graph manually splitting example into
the unit test.
Current unit test create 3 seperate module then re-connect them to
run the pipeline executor. And this is not a real use case for pipeline
executor.
Adding a manually graph splitting logic which split a full network into 3
subgraph then run the pipeline executor and verify the result to
simulate the real use case.
* address review comments
* trigger build.
* address review comments
* address review comments
* rebase and trigger build.
---
tests/python/relay/test_pipeline_executor.py | 224 ++++++++++++++++++++++++---
1 file changed, 201 insertions(+), 23 deletions(-)
diff --git a/tests/python/relay/test_pipeline_executor.py
b/tests/python/relay/test_pipeline_executor.py
index b97966dde0..541f3bba13 100644
--- a/tests/python/relay/test_pipeline_executor.py
+++ b/tests/python/relay/test_pipeline_executor.py
@@ -22,12 +22,195 @@ import numpy as np
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+def graph_split(expr, split_conf, params=None):
+ """Splitting the graph into a list of subgraphs"""
+
+ def get_dep_var(sub_var_dep):
+ return [var for var in sub_var_dep[len(sub_var_dep) - 1]["ref_nodes"]]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for dep in snode_dep[:-1]:
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] += 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is a free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx += 1
+ else:
+ new_args.append(var)
+ # if the 'tvm.relay.expr.Call' has a free_var, recreate it with new
name as 'data_n_*'.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumurate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraphs can find
correct constant.
+ if isinstance(value, tvm.relay.expr.Constant):
+ if not constant_expr:
+ constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
+ else:
+ constant_expr = tvm.relay.expr.Let(anf.var, value,
constant_expr)
+ if isinstance(value, tvm.relay.expr.Call):
+ new_args = []
+ # build current var list
+ cur_node_dep["nodes"][anf.var] = 0
+ # Get the dependency information of the nodes.
+ value, snode_dep, new_input_idx = parse_dependency(value,
snode_dep, new_input_idx)
+ if isinstance(value.op, tvm.ir.Op):
+ if value.op.name in operator_index_map:
+ operator_index_map[value.op.name] += 1
+ else:
+ operator_index_map[value.op.name] = 0
+ split_operator_name = split_conf[0]["op_name"] if
split_conf else ""
+ split_operator_index = split_conf[0]["op_index"] if
split_conf else ""
+ # if a operator name and repeating count in the network
match with the values
+ # of the 'split configuration', then this place is where
we should do the
+ # graph splitting.
+ if (
+ split_conf
+ and split_operator_name in operator_index_map
+ and operator_index_map[split_operator_name] >=
split_operator_index
+ ):
+ # Do graph splitting.
+ split_conf.pop(0)
+ snode_dep.append({"nodes": {}, "ref_nodes": {}})
+ ann = _recursion(
+ anf.body,
+ pipeline_mods,
+ split_conf,
+ constant_expr,
+ )
+ snode_dep.pop()
+ dep_vars = get_dep_var(snode_dep)
+ # When the nodes of the current subgraph are the
depedency node of another
+ # subgraph, we need to set them as the output of
current subgraph.
+ body = relay.Tuple(dep_vars) if len(dep_vars) > 1 else
anf.var
+ # when the operator of current subgraph uses previous
subgraph constant
+ # as the argument of a "relay.expr.call", such
constant may become a free
+ # varaible if the constant does not exist in the
current subgraph.
+ # merge the previous constant with current subgraph to
avoid such issue.
+ if constant_expr:
+ ann = merge_constant_expr(constant_expr, ann)
+ ann = run_opt_pass(ann, transform.ToGraphNormalForm())
+ mod = tvm.IRModule.from_expr(ann)
+ pipeline_mods.insert(0, mod)
+ # Return the last node of the current subgraph.
+ return tvm.relay.expr.Let(anf.var, value, body)
+ return tvm.relay.expr.Let(
+ anf.var,
+ value,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ )
+ else:
+ return anf
+
+ snode_dep = [{"nodes": {}, "ref_nodes": {}}]
+ pipeline_mods = []
+ operator_index_map = {}
+ # Used to tracking new input which caused by graph splitting.
+ new_input_idx = 0
+ constant_expr = None
+ subgraph_split_conf = split_conf.copy()
+ # Binding the parameters.
+ if params:
+ expr = build_module.bind_params_by_name(expr, params)
+ anf = run_opt_pass(expr, transform.ToANormalForm())
+ anf = run_opt_pass(anf, transform.InferType())
+ ann = _recursion(
+ anf,
+ pipeline_mods,
+ subgraph_split_conf,
+ constant_expr,
+ )
+ ann = run_opt_pass(ann.body, transform.ToGraphNormalForm())
+ mod = tvm.IRModule.from_expr(ann)
+ pipeline_mods.insert(0, mod)
+ return pipeline_mods
+
+
+def get_network():
+ # Get a list of modules representing subgraphs.
+ mods = []
+ dshape = (3, 3)
+ data = relay.var("data_0", relay.TensorType(dshape, "float32"))
+ data21 = relay.var("data_1", relay.TensorType(dshape, "float32"))
+ data_net1_output_1 = relay.var("data_0", relay.TensorType(dshape,
"float32"))
+ data_net1_output_2 = relay.var("data_1", relay.TensorType(dshape,
"float32"))
+ data_net2_output_1 = relay.var("data_0", relay.TensorType(dshape,
"float32"))
+ mvalue1 = np.full((1), 1).astype("float32")
+ mvalue2 = np.full((1), 2).astype("float32")
+ mvalue3 = np.full((1), 3).astype("float32")
+ mv1 = relay.Constant(tvm.nd.array(mvalue1))
+ mv2 = relay.Constant(tvm.nd.array(mvalue2))
+ mv3 = relay.Constant(tvm.nd.array(mvalue3))
+ # There are three outputs in the first model.
+ net1_output1 = relay.add(data, mv1)
+ net1_output2 = relay.subtract(data, mv2)
+ net1_output3 = relay.concatenate((net1_output1, net1_output2), axis=0)
+ (net1_output3, _) = relay.split(net1_output3, indices_or_sections=2,
axis=0)
+ net1_output3 = relay.add(net1_output3, mv2)
+ # The second model uses the output named net1_output3 of the first model
as the first input,
+ # the second input of the second model is data21.
+ net2 = relay.add(net1_output3, mv2)
+ net2 = relay.add(net2, data21)
+ net2_output = relay.add(net2, mv3)
+ # The third model uses the output named net2_output of the second model as
the first input
+ # and uses the output named net1_output2 of the first model as the second
input.
+ net3 = relay.multiply(net2_output, mv3)
+ net3 = relay.add(net3, net1_output2)
+ return tvm.IRModule.from_expr(relay.Function([data, data21],
relay.Tuple([net3]))), dshape
+
+
+def get_split_mod():
+ mod, dshape = get_network()
+ split_conf = [{"op_name": "add", "op_index": 1}, {"op_name": "add",
"op_index": 4}]
+ mods = graph_split(mod["main"], split_conf)
+ return mods, dshape
+
+
def get_mannual_mod():
# Get a list of modules representing subgraphs.
mods = []
@@ -83,9 +266,8 @@ def get_manual_conf(mods, target):
"mod_idx": 0,
"cpu_affinity": "0",
"output": [
- {"output_idx": 0, "dependencies": [{"mod_idx": 1, "input_name":
"data_0"}]},
- {"output_idx": 1, "dependencies": [{"mod_idx": 2, "input_name":
"data_0"}]},
- {"output_idx": 2, "dependencies": [{"global_output_index": 0}]},
+ {"output_idx": 0, "dependencies": [{"mod_idx": 1, "input_name":
"data_n_0"}]},
+ {"output_idx": 1, "dependencies": [{"mod_idx": 2, "input_name":
"data_n_2"}]},
],
}
mod_config[mods[0]] = {
@@ -103,7 +285,7 @@ def get_manual_conf(mods, target):
"mod_idx": 1,
"cpu_affinity": "0",
"output": [
- {"output_idx": 0, "dependencies": [{"mod_idx": 2, "input_name":
"data_1"}]},
+ {"output_idx": 0, "dependencies": [{"mod_idx": 2, "input_name":
"data_n_1"}]},
],
}
mod_config[mods[1]] = {
@@ -120,7 +302,7 @@ def get_manual_conf(mods, target):
pipe_config3 = {
"mod_idx": 2,
"cpu_affinity": "0",
- "output": [{"output_idx": 0, "dependencies": [{"global_output_index":
1}]}],
+ "output": [{"output_idx": 0, "dependencies": [{"global_output_index":
0}]}],
}
mod_config[mods[2]] = {
"pipeline": pipe_config3,
@@ -222,7 +404,7 @@ def test_pipe_runtime_error_check():
# This function is used to trigger runtime error by applying wrong logic.
if pipeline_executor_build.pipeline_executor_build_enabled():
# Get three pipeline modules here.
- (mod1, mod2, mod3), dshape = get_mannual_mod()
+ (mod1, mod2, mod3), dshape = get_split_mod()
# The input or output name is illegal and expects a runtime error.
pipe_error = pipeline_executor_build.PipelineConfig()
@@ -283,7 +465,7 @@ def test_pipeline():
for target in target_list:
affinity = os.sched_getaffinity(0)
# Get the three pipeline modules here.
- (mod1, mod2, mod3), dshape = get_mannual_mod()
+ (mod1, mod2, mod3), dshape = get_split_mod()
# Prepare batch data for pipeline computation.
datas = []
@@ -305,33 +487,29 @@ def test_pipeline():
pipe_config["input"]["data_b"].connect(pipe_config[mod2]["input"]["data_1"])
# The mod1 output[0] will be connected to a input named "data_0"
of mod2.
-
pipe_config[mod1]["output"][0].connect(pipe_config[mod2]["input"]["data_0"])
+
pipe_config[mod1]["output"][0].connect(pipe_config[mod2]["input"]["data_n_0"])
# The mod1 output[1] will be connected to a input named "data_0"
of mod3.
-
pipe_config[mod1]["output"][1].connect(pipe_config[mod3]["input"]["data_0"])
+
pipe_config[mod1]["output"][1].connect(pipe_config[mod3]["input"]["data_n_2"])
# The mod2 output[2] will be connected to a input named "data_1"
of mod3.
-
pipe_config[mod2]["output"][0].connect(pipe_config[mod3]["input"]["data_1"])
-
- # The mod1 output[2] will be connected to pipeline output[0].
- pipe_config[mod1]["output"][2].connect(pipe_config["output"]["0"])
+
pipe_config[mod2]["output"][0].connect(pipe_config[mod3]["input"]["data_n_1"])
- # The mod3 output[0] will be connected to pipeline output[1].
- pipe_config[mod3]["output"][0].connect(pipe_config["output"]["1"])
- # Print configueration (print(pipe_config)), the result looks like
following.
+ # The mod3 output[0] will be connected to pipeline output[0].
+ pipe_config[mod3]["output"][0].connect(pipe_config["output"]["0"])
+ # Print configuration (print(pipe_config)), the result looks like
following.
#
# Inputs
# |data_a: mod1:data_0
# |data_b: mod2:data_1
#
# output
- # |output(1) : mod1.output(2)
- # |output(2) : mod3.output(0)
+ # |output(1) : mod3.output(0)
#
# connections
- # |mod1.output(0)-> mod2.data_0
- # |mod1.output(1)-> mod3.data_0
- # |mod2.output(0)-> mod3.data_1
+ # |mod1.output(0)-> mod2.data_n_0
+ # |mod1.output(1)-> mod3.data_n_2
+ # |mod2.output(0)-> mod3.data_n_1
# Set other parameters.
pipe_config[mod1].target = target[0]
@@ -367,7 +545,7 @@ def test_pipeline():
# Use the import function to create and initialize PipelineModule.
pipeline_module_test =
pipeline_executor.PipelineModule.load_library(config_file_name)
- assert pipeline_module_test.num_outputs == 2
+ assert pipeline_module_test.num_outputs == 1
input_map = pipeline_module_test.get_input_pipeline_map("data_b")
assert input_map[0] == "1" and input_map[1] == "data_1"