huajsj commented on a change in pull request #8702:
URL: https://github.com/apache/tvm/pull/8702#discussion_r697914291



##########
File path: tests/python/relay/test_pipeline_executor.py
##########
@@ -0,0 +1,256 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import numpy as np
+import tvm
+import tvm.testing
+from tvm import relay
+from tvm.relay import transform
+from tvm.contrib import graph_executor, pipeline_executor
+
+
+def get_mannual_mod():
+    """
+    # get list of module that represent a subgraph
+    """
+    mods = []
+    dshape = (3, 3)
+    data = relay.var("data_0", relay.TensorType(dshape, "float32"))
+    data21 = relay.var("data_1", relay.TensorType(dshape, "float32"))
+    data_net1_output_1 = relay.var("data_0", relay.TensorType(dshape, 
"float32"))
+    data_net1_output_2 = relay.var("data_1", relay.TensorType(dshape, 
"float32"))
+    data_net2_output_1 = relay.var("data_0", relay.TensorType(dshape, 
"float32"))
+    mvalue1 = np.full((1), 1).astype("float32")
+    mvalue2 = np.full((1), 2).astype("float32")
+    mvalue3 = np.full((1), 3).astype("float32")
+    mv1 = relay.Constant(tvm.nd.array(mvalue1))
+    mv2 = relay.Constant(tvm.nd.array(mvalue2))
+    mv3 = relay.Constant(tvm.nd.array(mvalue3))
+
+    """
+    # net1 have three output, output3 is final output.
+    """
+
+    net_output1 = relay.add(data, mv1)
+    net_output2 = relay.subtract(data, mv2)
+    net_output3 = relay.multiply(data, mv3)
+
+    """
+    # net2 use net1 output1 as input.
+    """
+    net2 = relay.add(data_net1_output_1, mv2)
+    net2 = relay.add(net2, data21)
+    net2 = relay.add(net2, mv3)
+
+    """
+    # net3 use net2 output1 and net1 outpu2 as input.
+    """
+    net3 = relay.multiply(data_net2_output_1, mv3)
+    net3 = relay.add(net3, data_net1_output_2)
+
+    mods.append(
+        tvm.IRModule.from_expr(
+            relay.Function([data], relay.Tuple([net_output1, net_output2, 
net_output3]))
+        )
+    )
+    mods.append(tvm.IRModule.from_expr(relay.Function([data_net1_output_1, 
data21], net2)))
+    mods.append(
+        tvm.IRModule.from_expr(relay.Function([data_net1_output_2, 
data_net2_output_1], net3))
+    )
+
+    return mods, dshape
+
+
+def get_manual_conf(mods):
+    """
+    # This function use to generate manual pipe line configueration,
+    # the result use to verify if the pipe configuration can generate
+    # correct result.
+    """
+    mod_config = {}
+    """
+    # set configure
+    """
+    mconfig1 = {}
+    """
+    # third output is final output, second output for mod3, first for mod2
+    # input
+    """
+    mconfig1["pipeline"] = {
+        "mod_indx": 1,
+        "output": [
+            {"output_indx": 0, "dependent": [{"mod_indx": 2, "input_name": 
"data_0"}]},
+            {"output_indx": 1, "dependent": [{"mod_indx": 3, "input_name": 
"data_0"}]},
+            {"output_indx": 2, "dependent": [{"mod_indx": 0, "input_name": 
"0"}]},
+        ],
+    }
+    mod_config[mods[0]] = mconfig1
+
+    mconfig2 = {}
+    mconfig2["pipeline"] = {
+        "mod_indx": 2,
+        "output": [
+            {"output_indx": 0, "dependent": [{"mod_indx": 3, "input_name": 
"data_1"}]},
+        ],
+    }
+    mod_config[mods[1]] = mconfig2
+
+    mconfig3 = {}
+
+    mconfig3["pipeline"] = {
+        "mod_indx": 3,
+        "output": [{"output_indx": 0, "dependent": [{"mod_indx": 0, 
"input_name": "1"}]}],
+    }
+    mod_config[mods[2]] = mconfig3
+    return mod_config
+
+
+def pipeline_module_create(target):
+    """
+    #Get 3 pipeline module.
+    """
+    (mod1, mod2, mod3), dshape = get_mannual_mod()
+
+    # Prepare batch data for pipeline feeding
+    datas = []
+    for i in range(5):
+        datas.append(np.full(dshape, 3 + i).astype("float32"))
+
+    pipe_config = pipeline_executor.PipelineModuleConfig([mod1, mod2, mod3])
+
+    # Create pipeline compute input/output and subgraph dependent relation.
+
+    # pipeline compute input "data_0" would get forward to mod1 as input 
"data_0"
+    pipe_config.connect(pipe_config.pipe_input("data_0"), 
pipe_config[mod1].input("data_0"))
+
+    # pipeline compute input "data_1" would get forward to mod2 as input 
"data_1"
+    pipe_config.connect(pipe_config.pipe_input("data_1"), 
pipe_config[mod2].input("data_1"))
+
+    # mod1 output(0) would get forward to mod2 as input "data_0"
+    pipe_config.connect(pipe_config[mod1].output(0), 
pipe_config[mod2].input("data_0"))
+
+    # mod1 output(1) would get forward to mod3 as input "data_0"
+    pipe_config.connect(pipe_config[mod1].output(1), 
pipe_config[mod3].input("data_0"))
+
+    # mod2 output(0) would get forward to mod3 as input "data_1"
+    pipe_config.connect(pipe_config[mod2].output(0), 
pipe_config[mod3].input("data_1"))
+
+    # mod1 output(2) would get forward as final pipeline compute output(1)
+    pipe_config.connect(pipe_config[mod1].output(2), 
pipe_config.pipe_output("0"))
+
+    # mod3 output(0) would get forward as final pipeline compute output(2)
+    pipe_config.connect(pipe_config[mod3].output(0), 
pipe_config.pipe_output("1"))
+    """
+    # print configueration, the expect result like following.
+    #
+    #Inputs
+    #  |data_0: mod1:data_0
+    #  |data_1: mod2:data_1
+    #
+    #output
+    #  |output(1) : mod1.output(2)
+    #  |output(2) : mod3.output(0)
+    #
+    #connections
+    #  |mod1.output(0)-> mod2.data_0
+    #  |mod1.output(1)-> mod3.data_0
+    #  |mod2.output(0)-> mod3.data_1
+    """
+
+    print(pipe_config)
+
+    """
+    # connection correctness veify
+    """
+    try:
+        pipe_config.connect(pipe_config[mod2].output(0), 
pipe_config[mod1].input("data_0"))
+        assert 0, f"wrong module connect order check not pass!"
+        pipe_config.connect(pipe_config.pipe_input("data_0"), 
pipe_config[mod1].output(0))
+        assert 0, f"wrong global input connect check not pass!"
+    except:
+        print("connection correctness check pass")
+
+    """
+    # get text format configuration.
+    """
+
+    pconfig = pipe_config.get_config()
+
+    """
+    # check if the configuration match expectation.
+    """
+    assert pconfig == get_manual_conf([mod1, mod2, mod3])
+
+    """
+    # generate configure for build process
+    """
+
+    mod_config = {}
+    mconfig1 = pconfig[mod1]
+    mconfig1["target_host"] = None
+    mconfig1["mod_name"] = "default"
+    mconfig1["build"] = None
+    mconfig1["params"] = None
+    mconfig1["target"] = target[0]
+    mconfig1["dev"] = target[1]
+    mod_config[mod1] = mconfig1
+
+    mconfig2 = pconfig[mod2]
+    mconfig2["target_host"] = None
+    mconfig2["mod_name"] = "default"
+    mconfig2["build"] = None
+    mconfig2["params"] = None
+    mconfig2["target"] = "llvm"
+    mconfig2["dev"] = tvm.cpu(0)
+    mod_config[mod2] = mconfig2
+
+    mconfig3 = pconfig[mod3]
+    mconfig3["target_host"] = None
+    mconfig3["mod_name"] = "default"
+    mconfig3["build"] = None
+    mconfig3["params"] = None
+    mconfig3["target"] = "llvm"
+    mconfig3["dev"] = tvm.cpu(0)
+    mod_config[mod3] = mconfig3
+
+    """
+    # Test build and create pipeline module
+    """
+    with relay.build_config(opt_level=3):
+        pipeline_mods, string_config = 
pipeline_executor.build_pipeline(mod_config)
+
+    pipeline_module = pipeline_executor.create(pipeline_mods, string_config)
+    return pipeline_module
+
+
+def pipeline(target):
+    module = pipeline_module_create(target)
+    """
+    # Check if pipeline executor create value is valid.
+    """
+    assert module

Review comment:
       fixed

##########
File path: python/tvm/contrib/pipeline_executor.py
##########
@@ -0,0 +1,352 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Pipeline executor that executes pipeline containing TVM PackedFunc."""
+import json
+import tvm._ffi
+from tvm import relay
+from tvm.contrib import graph_executor
+
+
+def pipeline_executor_enabled():
+    """check if pipeline executor enabled.
+    Return
+    ------
+    enable: bool
+        return pipeline executor get enabled or not
+    """
+    pipeline_enabled = False
+    try:
+        pipelinecreate = 
tvm._ffi.get_global_func("tvm.pipeline_executor.create")
+        assert pipelinecreate
+        pipeline_enabled = True
+    except ValueError:
+        print("pipeline executor not enabled!")
+
+    return pipeline_enabled
+
+
+def build_pipeline(mod_n_configs):
+    """build module list that can use for pipeline execution.
+
+    Parameters
+    ----------
+    mod_n_configs: Dict[IRModule, Dict[str, Any]]
+        build configuration informaton, structure like following.
+        {IRModule: {"target":target,
+                    "target_host":target_host,
+                    "params":params,
+                    "mod_name"mod_name,
+                    "build":build}}
+
+    Returns
+    -------
+    ret: List[IRModule]
+        list of IRModule
+    string_config: Dict[int, Dict[str, any]]
+        pipeline configuration
+    """
+    mods = {}
+    config_len = len(mod_n_configs)
+    string_config = [{} for _ in range(config_len)]
+    for _, (ir_mod, mod_config) in enumerate(mod_n_configs.items()):
+        # init lib_name and json_name params with empty
+        lib_name = ""
+        json_name = ""
+        params_name = ""
+        # Get module configuration
+        assert "pipeline" in mod_config and "mod_indx" in 
mod_config["pipeline"]
+        # Get module index in pipeline configuration
+        mconf = mod_config["pipeline"].copy()
+        # Get mod device config
+        dev = mod_config["dev"]
+        mod_indx = mconf["mod_indx"] - 1
+        target = mod_config["target"]
+        assert mod_indx < config_len
+        build_func = relay.build
+        # if there is a self defined build function then use it.
+        if "build" in mod_config and mod_config["build"]:
+            build_func = mod_config["build"]
+
+        # build IRModule
+        mod = build_func(
+            ir_mod,
+            target,
+            params=mod_config["params"],
+            target_host=mod_config["target_host"],
+            mod_name=mod_config["mod_name"],
+        )
+
+        mconf["lib_name"] = lib_name
+        mconf["json_name"] = json_name
+        mconf["params_name"] = params_name
+        mconf["dev"] = "{},{}".format(dev.device_type, dev.device_id)
+        # Create pipeline configuration
+        string_config[mod_indx] = mconf
+        # associate mod with device
+        mods[mod] = {"dev": dev}
+
+    # return IRModule list and pipeline configuration
+    return mods, string_config
+
+
+def create(pipeline_mods, mod_config):
+    """Create a pipeline runtime executor.
+
+    Parameters
+    ----------
+    pipeline_mods : List[IRModule]
+        list of IRModule
+
+    mod_config : Dict[int, Dict[str, Any]]
+        modules and modules dependency configuration informaiton.
+
+    Returns
+    -------
+    submodule : PipelineModule
+        Runtime pipeline module.
+    """
+
+    submodule = PipelineModule(pipeline_mods, mod_config)
+    return submodule
+
+
+class PipelineModule(object):
+    """Wrapper runtime module. This is a thin wrapper of the underlying TVM 
module.
+    Parameters
+    ----------
+    pipeline_mods : List[GraphModule]
+        The internal tvm module that holds the actual graph functions.
+
+    pipeline_config : Dict[IRModule, Dict[str, Any]]
+        modules and modules dependency configuration informaiton.
+
+    """
+
+    def __init__(self, pipeline_mods, pipeline_config):
+        self.pipeline_mods = pipeline_mods
+        self.mod_config = pipeline_config
+        mods, config = self.graph_executor_create(pipeline_mods, 
pipeline_config)
+
+        pipelinecreate = 
tvm._ffi.get_global_func("tvm.pipeline_executor.create")
+        assert pipelinecreate
+        module = pipelinecreate(mods, config)
+
+        self.module_ = module
+
+    def graph_executor_create(self, pipeline_mods, mod_config):
+        """Create a pipeline runtime executor.
+
+        Parameters
+        ----------
+        pipeline_mods : List[IRModule]
+          list of IRModule
+
+        mod_config : Dict[int, Dict[str, Any]]
+            modules and modules dependency configuration informaiton.
+
+        Returns
+        -------
+        mods : GreaphModule
+            Runtime graph module.
+        """
+
+        mods = []
+        for pipeline_mod in pipeline_mods:
+            mod = graph_executor.GraphModule(
+                pipeline_mod["default"](pipeline_mods[pipeline_mod]["dev"])
+            )
+            mods.append(mod.module)
+
+        return mods, json.dumps(mod_config)
+
+
+class PipelineModuleConfig:
+    """Pipeline Configuration Class, in this class there are 2 internal class,
+    first is Instance which use to represent Module, second is Interface which 
use
+    to represent Module input/output and Pipeline Module input/output, by 
setting
+    dependency relation between Interfaces this class can build the module
+    connection relation.
+
+    The class Hierarchical as following.
+         PipelineModuleConfig ---> Pipe   Instance ---> Interface(input/output)
+                              ---> Module Instance ---> Interface(input/output)

Review comment:
       the reason to use one more class to wrap interface is that we need a 
method to build/find interface by using mod and interface name(for example mod1 
input "data_0"), first we can not implement such function inside class  
interface,  second if we implement such function inside PipelineModuleConfig 
that would like ' PipelineModuleConfig.input(mod1, "data_0") ' , such function 
is not such straight forward to compare with ' 
PipelineModuleConfig[mod1].input("data_0")' , hence here I add one additional 
class for the purpose to make
   API usage more readable.
   I agree that Instance name is vague, already change it into new name 
'Module', please let me know how you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to