huajsj commented on a change in pull request #8702:
URL: https://github.com/apache/tvm/pull/8702#discussion_r706876392



##########
File path: python/tvm/contrib/pipeline_executor.py
##########
@@ -0,0 +1,529 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Pipeline executor that executes a series of modules in a pipeline 
fashion."""
+import json
+import tvm._ffi
+from tvm import relay
+from tvm.relay.transform import InferType
+from tvm.contrib import graph_executor
+
+
+def pipeline_executor_enabled():
+    """Check if the pipeline executor is enabled.
+
+    Return
+    -------
+    enable: bool
+        Return whether pipeline executor is enabled.
+    """
+    return tvm._ffi.get_global_func("tvm.pipeline_executor.create", 
allow_missing=True) is not None
+
+
+def build(pipe_configs):
+    """Use pipe_config to build and return Module list and Module dependency 
configuration.
+
+    Parameters
+    ----------
+    pipe_configs: PipelineConfig
+        Configuration information for build.
+
+    Returns
+    -------
+    ret: PipelineExecutorFactoryModule
+        A class that wraps module list and module dependency configuration.
+    """
+    mods = {}
+    mod_n_configs = pipe_configs.get_config()
+    config_len = len(mod_n_configs)
+    string_config = [{} for _ in range(config_len)]
+    for ir_mod, mod_config in mod_n_configs.items():
+        mconf = mod_config["pipeline"].copy()
+        mod_idx = mconf["mod_idx"] - 1
+        # Get mod device configuration.
+        dev = mod_config["dev"]
+        target = mod_config["target"]
+        build_func = relay.build
+        # Check whether there is a customized build function.
+        if "build" in mod_config and mod_config["build"]:
+            build_func = mod_config["build"]
+
+        # Build.
+        mod = build_func(
+            ir_mod,
+            target,
+            params=mod_config["params"],
+            target_host=mod_config["target_host"],
+            mod_name=mod_config["mod_name"],
+        )
+
+        mconf["dev"] = "{},{}".format(dev.device_type, dev.device_id)
+        # Create pipeline configuration.
+        string_config[mod_idx] = mconf
+        # Set device.
+        mods[mod] = {"dev": dev}
+
+    return PipelineExecutorFactoryModule(mods, string_config)
+
+
+def create(pipe_executor_factory_module):
+    """Create a pipeline runtime executor.
+
+    Parameters
+    ----------
+    pipe_executor_factory_module : PipelineExecutorFactoryModule
+        It is wrapper class which include IRModule list and pipeline 
configuration.
+
+    Returns
+    -------
+    submodule : PipelineModule
+        Runtime pipeline module.
+    """
+
+    return PipelineModule(pipe_executor_factory_module)
+
+
+class PipelineModule(object):
+    """Wrapper of runtime module.
+
+    Parameters
+    ----------
+    pipeline_config : Dict[GraphExecutorFactoryModule, Dict[str, Any]]
+        Modules and modules dependency configuration informaitons.
+    """
+
+    def __init__(self, pipe_mod_config):
+        self.pipeline_mods = pipe_mod_config.pipeline_mods
+        self.mod_config = pipe_mod_config.mods_config
+        mods, config = self.graph_executor_create(self.pipeline_mods, 
self.mod_config)
+        assert (
+            pipeline_executor_enabled()
+        ), "Pipeline executor is not enabled. Please \
+              re-build TVM with USE_PIPELINE_EXECUTOR=ON"
+        pipeline_create = tvm._ffi.get_global_func(
+            "tvm.pipeline_executor.create", allow_missing=False
+        )
+        assert pipeline_create
+        module = pipeline_create(mods, config)
+
+        self.module_ = module
+
+    def graph_executor_create(self, pipeline_mods, mod_config):
+        """Create graph_executor list and return configuration as a json 
string.
+
+        Parameters
+        ----------
+        pipeline_mods : List[GraphExecutorFactoryModule]
+          List of GraphExecutorFactoryModule
+
+        mod_config : Dict[str, Any]
+            Modules dependency configuration information.
+
+        Returns
+        -------
+        mods : List[Module]
+            Module list.
+
+        mod_config : str
+            Mods configuration.
+        """
+
+        mods = []
+        for pipeline_mod in pipeline_mods:
+            mod = graph_executor.GraphModule(
+                pipeline_mod["default"](pipeline_mods[pipeline_mod]["dev"])
+            )
+            mods.append(mod.module)
+
+        return mods, json.dumps(mod_config)
+
+
+class PipelineConfig(object):
+    """The wrapper of each module to be pipelined. The wrapper mainly includes 
the
+    module itself as well as the binding that represents the connections of 
this
+    module's inputs and outputs to other modules.
+    """
+
+    class Binding:
+        """This class define the module connection information.
+        The type can only be either "input" or "output".
+
+        Parameters
+        ----------
+        owner : ModuleWrapper
+            The class who owns this interface.
+
+        io_type : str
+            The type of this interface. It can only be either "input" or 
"output".
+
+        name : str/integer
+            Name, for input it is string such as "data0", for output it is the
+            idx integer such as 0.
+        """
+
+        def __init__(self, owner, io_type, name, data_type=None):
+            self.io_owner = owner
+            self.io_type = io_type
+            self.name = str(name)
+            # Child nodes.
+            self.bindings = []
+            # Parents nodes.
+            self.parents = []
+
+            self.data_type = data_type
+
+        def get_name(self):
+            """Return the interface name and name of owner who own this 
interface."""
+            owner_name = ""
+            if isinstance(self.io_owner, PipelineConfig.ModuleWrapper):
+                owner_name = self.io_owner.name
+
+            return owner_name, self.name
+
+        def get_owner_idx(self):
+            """Return owner idex if owner is ModuleWrapper, if not return 0."""
+            if isinstance(self.io_owner, PipelineConfig.ModuleWrapper):
+                return self.io_owner.idx
+
+            # If not ModuleWrapper then owner is PipelineConfig, return 0
+            # to identify this is global interface
+            return 0
+
+        def is_global_interface(self):
+            """It is to check whether this interface is global interface."""
+            return not isinstance(self.io_owner, PipelineConfig.ModuleWrapper)
+
+        def __repr__(self):
+            """Get all binding(input data) informations that looks like 
'|data_0: mod1:data_0'."""
+            ret = "  |{}: ".format(self.name)
+            for binding in self.bindings:
+                mname, dname = binding.get_name()
+                ret += "{0}:{1} ".format(mname, dname)
+            return ret
+
+        def check_dag_acyclic(self, start, inputs):
+            """It is to check whether the DAG that contain the inputs 
interfaces is acyclic."""
+            for binding in inputs.values():
+                if start == binding.io_owner:
+                    return False
+                for p in binding.parents:
+                    if not self.check_dag_acyclic(start, 
p.io_owner.input_bindings.bindings):
+                        return False
+
+            return True
+
+        def connect(self, binding):
+            """
+            # Check whether the binding settings is correct or not.
+            # correct connection are following
+            # 1. global input to module input
+            # 2. module output to global output
+            # 3. module output to module input
+            """
+            if self.io_owner == binding.io_owner:
+                raise RuntimeError(f"Can not bind itself.")
+
+            if not self.is_global_interface() and self.io_type == "input":
+                raise RuntimeError(f"Module can only bind from output 
interface!")
+
+            if (
+                not self.is_global_interface()
+                and not binding.is_global_interface()
+                and binding.io_type == "output"
+            ):
+                raise RuntimeError(f"Can not bind module output with another 
module output!")
+
+            if (
+                not self.is_global_interface()
+                and binding.is_global_interface()
+                and binding.io_type == "input"
+            ):
+                raise RuntimeError(f"Can not bind module output with global 
input!")
+
+            if self.is_global_interface() and self.io_type == "output":
+                raise RuntimeError(f"Global output can not be used as binding 
start point.")
+
+            if self.is_global_interface() and binding.io_type != "input":
+                raise RuntimeError(f"Global input can only bind with module 
input.")
+
+            self.bindings.append(binding)
+            if not self.is_global_interface():
+                # check if the source and target data_type same
+                if (
+                    isinstance(binding.io_owner, PipelineConfig.ModuleWrapper)
+                    and self.data_type != binding.data_type
+                ):
+                    raise RuntimeError(
+                        f"Illegal type (%s vs. %s): binding type is not same!"
+                        % (self.data_type, binding.data_type)
+                    )
+
+                binding.parents.append(self)
+                # Do acyclic check after increase the in-degree.
+                if not self.check_dag_acyclic(
+                    binding.io_owner, self.io_owner.input_bindings.bindings
+                ):
+                    raise RuntimeError(f"Illegal connection: Cause a circle!")
+
+    class BindingList:
+        """Container for bindings(input or output interface).
+
+        Parameters
+        ----------
+        owner : ModuleWrapper/PipelineConfig
+            The owner of this list can be ModuleWrapper or PipelineConfig.
+
+        type_name : str
+            The type of this binding list can be either "input" or "output".
+        """
+
+        def __init__(self, owner, type_name):
+            self.bindings = {}
+            self.io_owner = owner
+            self.binding_type = type_name
+
+        def get_binding_data_type(self, key):
+            if isinstance(self.io_owner, PipelineConfig.ModuleWrapper):
+                return self.io_owner.get_data_type(key, self.binding_type)
+            return None
+
+        def __getitem__(self, key):
+            if key not in self.bindings:
+                data_type = self.get_binding_data_type(key)
+                if not data_type and isinstance(self.io_owner, 
PipelineConfig.ModuleWrapper):
+                    raise RuntimeError(f"Can not find {key} in binding list 
{self.binding_type}.")
+
+                self.bindings[key] = PipelineConfig.Binding(
+                    self.io_owner, self.binding_type, key, data_type
+                )
+
+            return self.bindings[key]
+
+    class ModuleWrapper:
+        """This class is a wrapper that represent the module, contains module 
informations,
+        binding informations and building informations.
+        """
+
+        def __init__(self, mod=None):
+            """Init class"""
+            self.target_host = None
+            self.build_func = None
+            self.params = None
+            self.target = None
+            self.name = None
+            self.dev = None
+            self.idx = None
+            self.mod = mod
+            self.input_params = InferType()(mod)["main"].params
+            self.output_values = InferType()(mod)["main"].checked_type.ret_type
+            self.input_bindings = PipelineConfig.BindingList(self, "input")
+            self.output_bindings = PipelineConfig.BindingList(self, "output")
+
+        def __eq__(self, other):
+            if isinstance(other, PipelineConfig.ModuleWrapper):
+                return self.mod == other.mod
+
+            return False
+
+        def __getitem__(self, key):
+            if isinstance(key, str):
+                if key == "input":
+                    return self.input_bindings
+
+                if key == "output":
+                    return self.output_bindings
+
+            raise RuntimeError(f"{key} not found!")
+
+        def get_data_type(self, key, stype):
+            """Get the data type of the input or output interface of the 
module."""

Review comment:
       stype is interface type for example "input", changed name into 
interface_type




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to