SebastianBoblestETAS commented on code in PR #11334:
URL: https://github.com/apache/tvm/pull/11334#discussion_r875461735
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
Review Comment:
``
```suggestion
# Enumerate all operators of compute graph, then split the compute
graph into a group of
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraph can find
correct constant.
+ if isinstance(value, tvm.relay.expr.Constant):
+ if not constant_expr:
+ constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
+ else:
+ constant_expr = tvm.relay.expr.Let(anf.var, value,
constant_expr)
+ if isinstance(value, tvm.relay.expr.Call):
+ new_args = []
+ # build current var list
+ cur_node_dep["nodes"][anf.var] = 0
+ # Get the dependency information of the nodes.
+ value, snode_dep, new_input_idx = parse_dependency(value,
snode_dep, new_input_idx)
+ if isinstance(value.op, tvm.ir.Op):
+ if value.op.name in operator_index_map:
+ operator_index_map[value.op.name] =
operator_index_map[value.op.name] + 1
+ else:
+ operator_index_map[value.op.name] = 0
+ split_operator_name = split_conf[0]["op_name"] if
split_conf else ""
+ split_operator_index = split_conf[0]["op_index"] if
split_conf else ""
+ if (
+ split_conf
+ and split_operator_name in operator_index_map
+ and operator_index_map[split_operator_name] >=
split_operator_index
+ ):
+ split_conf.pop(0)
+ snode_dep.append({"nodes": {}, "ref_nodes": {}})
+ ann = _recursion(
+ anf.body,
+ pipeline_mods,
+ split_conf,
+ constant_expr,
+ )
+ snode_dep.pop()
+ dep_vars = get_dep_var(snode_dep)
+ # When the nodes of current subgraph are the depedency
node of other
Review Comment:
```suggestion
# When the nodes of the current subgraph are the
depedency node of another
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraph can find
correct constant.
Review Comment:
```suggestion
# record the constant expr to make sure all subgraphs can find
correct constant.
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraph can find
correct constant.
+ if isinstance(value, tvm.relay.expr.Constant):
+ if not constant_expr:
+ constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
+ else:
+ constant_expr = tvm.relay.expr.Let(anf.var, value,
constant_expr)
+ if isinstance(value, tvm.relay.expr.Call):
+ new_args = []
+ # build current var list
+ cur_node_dep["nodes"][anf.var] = 0
+ # Get the dependency information of the nodes.
+ value, snode_dep, new_input_idx = parse_dependency(value,
snode_dep, new_input_idx)
+ if isinstance(value.op, tvm.ir.Op):
+ if value.op.name in operator_index_map:
+ operator_index_map[value.op.name] =
operator_index_map[value.op.name] + 1
+ else:
+ operator_index_map[value.op.name] = 0
+ split_operator_name = split_conf[0]["op_name"] if
split_conf else ""
+ split_operator_index = split_conf[0]["op_index"] if
split_conf else ""
+ if (
+ split_conf
+ and split_operator_name in operator_index_map
+ and operator_index_map[split_operator_name] >=
split_operator_index
+ ):
+ split_conf.pop(0)
+ snode_dep.append({"nodes": {}, "ref_nodes": {}})
+ ann = _recursion(
+ anf.body,
+ pipeline_mods,
+ split_conf,
+ constant_expr,
+ )
+ snode_dep.pop()
+ dep_vars = get_dep_var(snode_dep)
+ # When the nodes of current subgraph are the depedency
node of other
+ # subgraph, we need to set them as the output of
current subgraph.
+ body = relay.Tuple(dep_vars) if len(dep_vars) > 1 else
anf.var
+ # when current subgraph use previous subgraph constant,
+ # such constant may become free varaible due to the
constant
Review Comment:
```suggestion
# such constant may become a free variable if the
constant does not exist.
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraph can find
correct constant.
+ if isinstance(value, tvm.relay.expr.Constant):
+ if not constant_expr:
+ constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
+ else:
+ constant_expr = tvm.relay.expr.Let(anf.var, value,
constant_expr)
+ if isinstance(value, tvm.relay.expr.Call):
+ new_args = []
+ # build current var list
+ cur_node_dep["nodes"][anf.var] = 0
+ # Get the dependency information of the nodes.
+ value, snode_dep, new_input_idx = parse_dependency(value,
snode_dep, new_input_idx)
+ if isinstance(value.op, tvm.ir.Op):
+ if value.op.name in operator_index_map:
+ operator_index_map[value.op.name] =
operator_index_map[value.op.name] + 1
+ else:
+ operator_index_map[value.op.name] = 0
+ split_operator_name = split_conf[0]["op_name"] if
split_conf else ""
+ split_operator_index = split_conf[0]["op_index"] if
split_conf else ""
+ if (
+ split_conf
+ and split_operator_name in operator_index_map
+ and operator_index_map[split_operator_name] >=
split_operator_index
+ ):
+ split_conf.pop(0)
+ snode_dep.append({"nodes": {}, "ref_nodes": {}})
+ ann = _recursion(
+ anf.body,
+ pipeline_mods,
+ split_conf,
+ constant_expr,
+ )
+ snode_dep.pop()
+ dep_vars = get_dep_var(snode_dep)
+ # When the nodes of current subgraph are the depedency
node of other
+ # subgraph, we need to set them as the output of
current subgraph.
+ body = relay.Tuple(dep_vars) if len(dep_vars) > 1 else
anf.var
+ # when current subgraph use previous subgraph constant,
+ # such constant may become free varaible due to the
constant
+ # not exist, merge the previous constant with current
subgraph
+ # to avoid such issue.
+ if constant_expr:
+ ann = merge_constant_expr(constant_expr, ann)
+ ann = run_opt_pass(ann, transform.ToGraphNormalForm())
+ mod = tvm.IRModule.from_expr(ann)
+ pipeline_mods.insert(0, mod)
+ # Return the last node of the current subgraph.
+ return tvm.relay.expr.Let(anf.var, value, body)
+ return tvm.relay.expr.Let(
+ anf.var,
+ value,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ )
+ else:
+ return anf
+
+ snode_dep = [{"nodes": {}, "ref_nodes": {}}]
+ pipeline_mods = []
+ operator_index_map = {}
+ # Used to tracking new input which caused by graph splitting.
+ new_input_idx = 0
+ constant_expr = None
+ subgraph_split_conf = split_conf.copy()
+ # Binding the parameters.
+ if params:
+ expr = build_module.bind_params_by_name(expr, params)
+ anf = run_opt_pass(expr, transform.ToANormalForm())
+ anf = run_opt_pass(anf, transform.InferType())
+ ann = _recursion(
+ anf,
+ pipeline_mods,
+ subgraph_split_conf,
+ constant_expr,
+ )
+ ann = run_opt_pass(ann.body, transform.ToGraphNormalForm())
+ mod = tvm.IRModule.from_expr(ann)
+ pipeline_mods.insert(0, mod)
+ return pipeline_mods
+
+
+def get_network():
+ # Get a list of modules representing subgraphs.
+ mods = []
+ dshape = (3, 3)
+ data = relay.var("data_0", relay.TensorType(dshape, "float32"))
+ data21 = relay.var("data_1", relay.TensorType(dshape, "float32"))
+ data_net1_output_1 = relay.var("data_0", relay.TensorType(dshape,
"float32"))
+ data_net1_output_2 = relay.var("data_1", relay.TensorType(dshape,
"float32"))
+ data_net2_output_1 = relay.var("data_0", relay.TensorType(dshape,
"float32"))
+ mvalue1 = np.full((1), 1).astype("float32")
+ mvalue2 = np.full((1), 2).astype("float32")
+ mvalue3 = np.full((1), 3).astype("float32")
+ mv1 = relay.Constant(tvm.nd.array(mvalue1))
+ mv2 = relay.Constant(tvm.nd.array(mvalue2))
+ mv3 = relay.Constant(tvm.nd.array(mvalue3))
+ # There are three outputs in the first model.
+ net1_output1 = relay.add(data, mv1)
+ net1_output2 = relay.subtract(data, mv2)
+ net1_output3 = relay.concatenate((net1_output1, net1_output2), axis=0)
+ (net1_output3, _) = relay.split(net1_output3, indices_or_sections=2,
axis=0)
+ net1_output3 = relay.add(net1_output3, mv2)
+ # The second model use output named net1_output1 of the first model as the
first input,
+ # the second input of the second model is data21.
+ net2 = relay.add(net1_output3, mv2)
+ net2 = relay.add(net2, data21)
+ net2_output = relay.add(net2, mv3)
+ # The third model use the output named net2_output of the second model as
the first input
Review Comment:
```suggestion
# The third model uses the output named net2_output of the second model
as the first input
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
Review Comment:
```suggestion
return [var for var in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"]]
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
Review Comment:
```suggestion
"""Splitting graph into a list of subgraphs"""
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraph can find
correct constant.
+ if isinstance(value, tvm.relay.expr.Constant):
+ if not constant_expr:
+ constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
+ else:
+ constant_expr = tvm.relay.expr.Let(anf.var, value,
constant_expr)
+ if isinstance(value, tvm.relay.expr.Call):
+ new_args = []
+ # build current var list
+ cur_node_dep["nodes"][anf.var] = 0
+ # Get the dependency information of the nodes.
+ value, snode_dep, new_input_idx = parse_dependency(value,
snode_dep, new_input_idx)
+ if isinstance(value.op, tvm.ir.Op):
+ if value.op.name in operator_index_map:
+ operator_index_map[value.op.name] =
operator_index_map[value.op.name] + 1
Review Comment:
```suggestion
operator_index_map[value.op.name] += 1
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraph can find
correct constant.
+ if isinstance(value, tvm.relay.expr.Constant):
+ if not constant_expr:
+ constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
+ else:
+ constant_expr = tvm.relay.expr.Let(anf.var, value,
constant_expr)
+ if isinstance(value, tvm.relay.expr.Call):
+ new_args = []
+ # build current var list
+ cur_node_dep["nodes"][anf.var] = 0
+ # Get the dependency information of the nodes.
+ value, snode_dep, new_input_idx = parse_dependency(value,
snode_dep, new_input_idx)
+ if isinstance(value.op, tvm.ir.Op):
+ if value.op.name in operator_index_map:
+ operator_index_map[value.op.name] =
operator_index_map[value.op.name] + 1
+ else:
+ operator_index_map[value.op.name] = 0
+ split_operator_name = split_conf[0]["op_name"] if
split_conf else ""
+ split_operator_index = split_conf[0]["op_index"] if
split_conf else ""
+ if (
+ split_conf
+ and split_operator_name in operator_index_map
+ and operator_index_map[split_operator_name] >=
split_operator_index
+ ):
+ split_conf.pop(0)
+ snode_dep.append({"nodes": {}, "ref_nodes": {}})
+ ann = _recursion(
+ anf.body,
+ pipeline_mods,
+ split_conf,
+ constant_expr,
+ )
+ snode_dep.pop()
+ dep_vars = get_dep_var(snode_dep)
+ # When the nodes of current subgraph are the depedency
node of other
+ # subgraph, we need to set them as the output of
current subgraph.
+ body = relay.Tuple(dep_vars) if len(dep_vars) > 1 else
anf.var
+ # when current subgraph use previous subgraph constant,
+ # such constant may become free varaible due to the
constant
Review Comment:
I do not fully understand this comment
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
Review Comment:
This could be made a docstring of graph_split.
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
Review Comment:
```suggestion
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraph can find
correct constant.
+ if isinstance(value, tvm.relay.expr.Constant):
+ if not constant_expr:
+ constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
+ else:
+ constant_expr = tvm.relay.expr.Let(anf.var, value,
constant_expr)
+ if isinstance(value, tvm.relay.expr.Call):
+ new_args = []
+ # build current var list
+ cur_node_dep["nodes"][anf.var] = 0
+ # Get the dependency information of the nodes.
+ value, snode_dep, new_input_idx = parse_dependency(value,
snode_dep, new_input_idx)
+ if isinstance(value.op, tvm.ir.Op):
+ if value.op.name in operator_index_map:
+ operator_index_map[value.op.name] =
operator_index_map[value.op.name] + 1
+ else:
+ operator_index_map[value.op.name] = 0
+ split_operator_name = split_conf[0]["op_name"] if
split_conf else ""
+ split_operator_index = split_conf[0]["op_index"] if
split_conf else ""
+ if (
+ split_conf
+ and split_operator_name in operator_index_map
+ and operator_index_map[split_operator_name] >=
split_operator_index
+ ):
+ split_conf.pop(0)
+ snode_dep.append({"nodes": {}, "ref_nodes": {}})
+ ann = _recursion(
+ anf.body,
+ pipeline_mods,
+ split_conf,
+ constant_expr,
+ )
+ snode_dep.pop()
+ dep_vars = get_dep_var(snode_dep)
+ # When the nodes of current subgraph are the depedency
node of other
+ # subgraph, we need to set them as the output of
current subgraph.
+ body = relay.Tuple(dep_vars) if len(dep_vars) > 1 else
anf.var
+ # when current subgraph use previous subgraph constant,
+ # such constant may become free varaible due to the
constant
+ # not exist, merge the previous constant with current
subgraph
+ # to avoid such issue.
+ if constant_expr:
+ ann = merge_constant_expr(constant_expr, ann)
+ ann = run_opt_pass(ann, transform.ToGraphNormalForm())
+ mod = tvm.IRModule.from_expr(ann)
+ pipeline_mods.insert(0, mod)
+ # Return the last node of the current subgraph.
+ return tvm.relay.expr.Let(anf.var, value, body)
+ return tvm.relay.expr.Let(
+ anf.var,
+ value,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ )
+ else:
+ return anf
+
+ snode_dep = [{"nodes": {}, "ref_nodes": {}}]
+ pipeline_mods = []
+ operator_index_map = {}
+ # Used to tracking new input which caused by graph splitting.
+ new_input_idx = 0
+ constant_expr = None
+ subgraph_split_conf = split_conf.copy()
+ # Binding the parameters.
+ if params:
+ expr = build_module.bind_params_by_name(expr, params)
+ anf = run_opt_pass(expr, transform.ToANormalForm())
+ anf = run_opt_pass(anf, transform.InferType())
+ ann = _recursion(
+ anf,
+ pipeline_mods,
+ subgraph_split_conf,
+ constant_expr,
+ )
+ ann = run_opt_pass(ann.body, transform.ToGraphNormalForm())
+ mod = tvm.IRModule.from_expr(ann)
+ pipeline_mods.insert(0, mod)
+ return pipeline_mods
+
+
+def get_network():
+ # Get a list of modules representing subgraphs.
+ mods = []
+ dshape = (3, 3)
+ data = relay.var("data_0", relay.TensorType(dshape, "float32"))
+ data21 = relay.var("data_1", relay.TensorType(dshape, "float32"))
+ data_net1_output_1 = relay.var("data_0", relay.TensorType(dshape,
"float32"))
+ data_net1_output_2 = relay.var("data_1", relay.TensorType(dshape,
"float32"))
+ data_net2_output_1 = relay.var("data_0", relay.TensorType(dshape,
"float32"))
+ mvalue1 = np.full((1), 1).astype("float32")
+ mvalue2 = np.full((1), 2).astype("float32")
+ mvalue3 = np.full((1), 3).astype("float32")
+ mv1 = relay.Constant(tvm.nd.array(mvalue1))
+ mv2 = relay.Constant(tvm.nd.array(mvalue2))
+ mv3 = relay.Constant(tvm.nd.array(mvalue3))
+ # There are three outputs in the first model.
+ net1_output1 = relay.add(data, mv1)
+ net1_output2 = relay.subtract(data, mv2)
+ net1_output3 = relay.concatenate((net1_output1, net1_output2), axis=0)
+ (net1_output3, _) = relay.split(net1_output3, indices_or_sections=2,
axis=0)
+ net1_output3 = relay.add(net1_output3, mv2)
+ # The second model use output named net1_output1 of the first model as the
first input,
+ # the second input of the second model is data21.
+ net2 = relay.add(net1_output3, mv2)
+ net2 = relay.add(net2, data21)
+ net2_output = relay.add(net2, mv3)
+ # The third model use the output named net2_output of the second model as
the first input
+ # and use the output named net1_output2 of the first model as the second
input.
Review Comment:
```suggestion
# and uses the output named net1_output2 of the first model as the
second input.
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -305,33 +485,29 @@ def test_pipeline():
pipe_config["input"]["data_b"].connect(pipe_config[mod2]["input"]["data_1"])
# The mod1 output[0] will be connected to a input named "data_0"
of mod2.
-
pipe_config[mod1]["output"][0].connect(pipe_config[mod2]["input"]["data_0"])
+
pipe_config[mod1]["output"][0].connect(pipe_config[mod2]["input"]["data_n_0"])
# The mod1 output[1] will be connected to a input named "data_0"
of mod3.
-
pipe_config[mod1]["output"][1].connect(pipe_config[mod3]["input"]["data_0"])
+
pipe_config[mod1]["output"][1].connect(pipe_config[mod3]["input"]["data_n_2"])
# The mod2 output[2] will be connected to a input named "data_1"
of mod3.
-
pipe_config[mod2]["output"][0].connect(pipe_config[mod3]["input"]["data_1"])
-
- # The mod1 output[2] will be connected to pipeline output[0].
- pipe_config[mod1]["output"][2].connect(pipe_config["output"]["0"])
+
pipe_config[mod2]["output"][0].connect(pipe_config[mod3]["input"]["data_n_1"])
- # The mod3 output[0] will be connected to pipeline output[1].
- pipe_config[mod3]["output"][0].connect(pipe_config["output"]["1"])
+ # The mod3 output[0] will be connected to pipeline output[0].
+ pipe_config[mod3]["output"][0].connect(pipe_config["output"]["0"])
# Print configueration (print(pipe_config)), the result looks like
following.
Review Comment:
```suggestion
# Print configuration (print(pipe_config)), the result looks
like following:
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
Review Comment:
```suggestion
# if the call has a free_var, recreate it.
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraph can find
correct constant.
+ if isinstance(value, tvm.relay.expr.Constant):
+ if not constant_expr:
+ constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
+ else:
+ constant_expr = tvm.relay.expr.Let(anf.var, value,
constant_expr)
+ if isinstance(value, tvm.relay.expr.Call):
+ new_args = []
+ # build current var list
+ cur_node_dep["nodes"][anf.var] = 0
+ # Get the dependency information of the nodes.
+ value, snode_dep, new_input_idx = parse_dependency(value,
snode_dep, new_input_idx)
+ if isinstance(value.op, tvm.ir.Op):
+ if value.op.name in operator_index_map:
+ operator_index_map[value.op.name] =
operator_index_map[value.op.name] + 1
+ else:
+ operator_index_map[value.op.name] = 0
+ split_operator_name = split_conf[0]["op_name"] if
split_conf else ""
+ split_operator_index = split_conf[0]["op_index"] if
split_conf else ""
+ if (
+ split_conf
+ and split_operator_name in operator_index_map
+ and operator_index_map[split_operator_name] >=
split_operator_index
Review Comment:
This is a string comparison, is this intended?.
split_operator_index sounds more like a integer.
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraph can find
correct constant.
+ if isinstance(value, tvm.relay.expr.Constant):
+ if not constant_expr:
+ constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
+ else:
+ constant_expr = tvm.relay.expr.Let(anf.var, value,
constant_expr)
+ if isinstance(value, tvm.relay.expr.Call):
+ new_args = []
+ # build current var list
+ cur_node_dep["nodes"][anf.var] = 0
+ # Get the dependency information of the nodes.
+ value, snode_dep, new_input_idx = parse_dependency(value,
snode_dep, new_input_idx)
+ if isinstance(value.op, tvm.ir.Op):
+ if value.op.name in operator_index_map:
+ operator_index_map[value.op.name] =
operator_index_map[value.op.name] + 1
+ else:
+ operator_index_map[value.op.name] = 0
+ split_operator_name = split_conf[0]["op_name"] if
split_conf else ""
+ split_operator_index = split_conf[0]["op_index"] if
split_conf else ""
+ if (
+ split_conf
+ and split_operator_name in operator_index_map
+ and operator_index_map[split_operator_name] >=
split_operator_index
+ ):
+ split_conf.pop(0)
+ snode_dep.append({"nodes": {}, "ref_nodes": {}})
+ ann = _recursion(
+ anf.body,
+ pipeline_mods,
+ split_conf,
+ constant_expr,
+ )
+ snode_dep.pop()
+ dep_vars = get_dep_var(snode_dep)
+ # When the nodes of current subgraph are the depedency
node of other
+ # subgraph, we need to set them as the output of
current subgraph.
+ body = relay.Tuple(dep_vars) if len(dep_vars) > 1 else
anf.var
+ # when current subgraph use previous subgraph constant,
+ # such constant may become free varaible due to the
constant
+ # not exist, merge the previous constant with current
subgraph
+ # to avoid such issue.
+ if constant_expr:
+ ann = merge_constant_expr(constant_expr, ann)
+ ann = run_opt_pass(ann, transform.ToGraphNormalForm())
+ mod = tvm.IRModule.from_expr(ann)
+ pipeline_mods.insert(0, mod)
+ # Return the last node of the current subgraph.
+ return tvm.relay.expr.Let(anf.var, value, body)
+ return tvm.relay.expr.Let(
+ anf.var,
+ value,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ )
+ else:
+ return anf
+
+ snode_dep = [{"nodes": {}, "ref_nodes": {}}]
+ pipeline_mods = []
+ operator_index_map = {}
+ # Used to tracking new input which caused by graph splitting.
+ new_input_idx = 0
+ constant_expr = None
+ subgraph_split_conf = split_conf.copy()
+ # Binding the parameters.
+ if params:
+ expr = build_module.bind_params_by_name(expr, params)
+ anf = run_opt_pass(expr, transform.ToANormalForm())
+ anf = run_opt_pass(anf, transform.InferType())
+ ann = _recursion(
+ anf,
+ pipeline_mods,
+ subgraph_split_conf,
+ constant_expr,
+ )
+ ann = run_opt_pass(ann.body, transform.ToGraphNormalForm())
+ mod = tvm.IRModule.from_expr(ann)
+ pipeline_mods.insert(0, mod)
+ return pipeline_mods
+
+
+def get_network():
+ # Get a list of modules representing subgraphs.
+ mods = []
+ dshape = (3, 3)
+ data = relay.var("data_0", relay.TensorType(dshape, "float32"))
+ data21 = relay.var("data_1", relay.TensorType(dshape, "float32"))
+ data_net1_output_1 = relay.var("data_0", relay.TensorType(dshape,
"float32"))
+ data_net1_output_2 = relay.var("data_1", relay.TensorType(dshape,
"float32"))
+ data_net2_output_1 = relay.var("data_0", relay.TensorType(dshape,
"float32"))
+ mvalue1 = np.full((1), 1).astype("float32")
+ mvalue2 = np.full((1), 2).astype("float32")
+ mvalue3 = np.full((1), 3).astype("float32")
+ mv1 = relay.Constant(tvm.nd.array(mvalue1))
+ mv2 = relay.Constant(tvm.nd.array(mvalue2))
+ mv3 = relay.Constant(tvm.nd.array(mvalue3))
+ # There are three outputs in the first model.
+ net1_output1 = relay.add(data, mv1)
+ net1_output2 = relay.subtract(data, mv2)
+ net1_output3 = relay.concatenate((net1_output1, net1_output2), axis=0)
+ (net1_output3, _) = relay.split(net1_output3, indices_or_sections=2,
axis=0)
+ net1_output3 = relay.add(net1_output3, mv2)
+ # The second model use output named net1_output1 of the first model as the
first input,
Review Comment:
```suggestion
# The second model uses the output named net1_output1 of the first model
as the first input,
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
Review Comment:
```suggestion
# merge constant express with an express
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
+ else:
+ new_args.append(var)
+ # if the call have a free_var, recreate it.
+ if need_update:
+ value = tvm.relay.expr.Call(
+ value.op, new_args, value.attrs, value.type_args, value.span
+ )
+ return value, snode_dep, new_input_idx
+
+ def merge_constant_expr(constant_expr, expr):
+ # merge constant express with a express
+ if not isinstance(constant_expr.body, tvm.relay.expr.Let):
+ return tvm.relay.expr.Let(constant_expr.var, constant_expr.value,
expr)
+
+ return tvm.relay.expr.Let(
+ constant_expr.var, constant_expr.value,
merge_constant_expr(constant_expr.body, expr)
+ )
+
+ def _recursion(anf, pipeline_mods, split_conf, constant_expr):
+ # Enumrate all operators of compute graph, then split the compute
graph into a group of
+ # subgraph.
+ nonlocal operator_index_map
+ nonlocal new_input_idx
+ nonlocal snode_dep
+ cur_node_dep = snode_dep[len(snode_dep) - 1]
+ if isinstance(anf, tvm.relay.Function):
+ return tvm.relay.Function(
+ anf.params,
+ _recursion(anf.body, pipeline_mods, split_conf, constant_expr),
+ anf.ret_type,
+ anf.type_params,
+ anf.attrs,
+ )
+ if isinstance(anf, tvm.relay.expr.Let):
+ value = anf.value
+ # record the constant expr to make sure all sugraph can find
correct constant.
+ if isinstance(value, tvm.relay.expr.Constant):
+ if not constant_expr:
+ constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
+ else:
+ constant_expr = tvm.relay.expr.Let(anf.var, value,
constant_expr)
+ if isinstance(value, tvm.relay.expr.Call):
+ new_args = []
+ # build current var list
+ cur_node_dep["nodes"][anf.var] = 0
+ # Get the dependency information of the nodes.
+ value, snode_dep, new_input_idx = parse_dependency(value,
snode_dep, new_input_idx)
+ if isinstance(value.op, tvm.ir.Op):
+ if value.op.name in operator_index_map:
+ operator_index_map[value.op.name] =
operator_index_map[value.op.name] + 1
+ else:
+ operator_index_map[value.op.name] = 0
+ split_operator_name = split_conf[0]["op_name"] if
split_conf else ""
+ split_operator_index = split_conf[0]["op_index"] if
split_conf else ""
+ if (
+ split_conf
+ and split_operator_name in operator_index_map
+ and operator_index_map[split_operator_name] >=
split_operator_index
+ ):
+ split_conf.pop(0)
+ snode_dep.append({"nodes": {}, "ref_nodes": {}})
+ ann = _recursion(
+ anf.body,
+ pipeline_mods,
+ split_conf,
+ constant_expr,
+ )
+ snode_dep.pop()
+ dep_vars = get_dep_var(snode_dep)
+ # When the nodes of current subgraph are the depedency
node of other
+ # subgraph, we need to set them as the output of
current subgraph.
+ body = relay.Tuple(dep_vars) if len(dep_vars) > 1 else
anf.var
+ # when current subgraph use previous subgraph constant,
Review Comment:
```suggestion
# when the current subgraph uses a previous subgraph
constant,
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
Review Comment:
```suggestion
dep["nodes"][var] += 1
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
+ if is_free_var:
+ need_update = True
+ new_args.append(relay.var(f"data_n_{new_input_idx}",
var.checked_type))
+ new_input_idx = new_input_idx + 1
Review Comment:
```suggestion
new_input_idx += 1
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
Review Comment:
```suggestion
for deb in snode_dep[:-1]:
```
##########
tests/python/relay/test_pipeline_executor.py:
##########
@@ -22,12 +22,193 @@
import tvm
import tvm.testing
from tvm import relay
-from tvm.relay import transform
+from tvm.relay import transform, build_module
+from tvm.relay.testing import run_opt_pass
from tvm.contrib import graph_executor, pipeline_executor,
pipeline_executor_build
from tvm._ffi import get_global_func
from tvm.contrib import cc as _cc
+"""Splitting graph into a list of subgraph"""
+
+
+def graph_split(expr, split_conf, params=None):
+ def get_dep_var(sub_var_dep):
+ return [var for var, _ in sub_var_dep[len(sub_var_dep) -
1]["ref_nodes"].items()]
+
+ def parse_dependency(value, snode_dep, new_input_idx):
+ new_args = []
+ need_update = False
+ for var in value.args:
+ is_free_var = False
+ for i in range(0, len(snode_dep) - 1):
+ dep = snode_dep[i]
+ if var in dep["nodes"]:
+ # Mark the previous subgraph node as a dependency.
+ dep["nodes"][var] = dep["nodes"][var] + 1
+ dep["ref_nodes"][var] = dep["nodes"][var]
+ # The var of this call is a free_var
+ is_free_var = True
+ # if the var of this call is free_var, recreate it and give it a
fixed input name.
Review Comment:
```suggestion
# if the var of this call is a free_var, recreate it and give it
a fixed input name.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]