huajsj commented on a change in pull request #8702:
URL: https://github.com/apache/tvm/pull/8702#discussion_r708835908



##########
File path: python/tvm/contrib/pipeline_executor.py
##########
@@ -0,0 +1,539 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Pipeline executor that executes a series of modules in a pipeline 
fashion."""
+import json
+import tvm._ffi
+from tvm import relay
+from tvm.relay.transform import InferType
+from tvm.contrib import graph_executor
+
+
+def pipeline_executor_enabled():
+    """Check if the pipeline executor is enabled.
+
+    Return
+    -------
+    enable: bool
+        Return whether the pipeline executor is enabled.
+    """
+    return tvm._ffi.get_global_func("tvm.pipeline_executor.create", 
allow_missing=True) is not None
+
+
+def build(pipe_configs):
+    """Build these modules used in the pipeline executor, then use these 
modules and configuration
+    to create a pipeline executor.
+
+    Parameters
+    ----------
+    pipe_configs: PipelineConfig
+        Build Configuration information.
+
+    Returns
+    -------
+    ret: PipelineExecutorFactoryModule
+        Common interface for pipeline executor factory modules.
+    """
+    mods = {}
+    mod_n_configs = pipe_configs.get_config()
+    config_len = len(mod_n_configs)
+    string_config = [{} for _ in range(config_len)]
+    for ir_mod, mod_config in mod_n_configs.items():
+        mconf = mod_config["pipeline"].copy()
+        mod_idx = mconf["mod_idx"] - 1
+        dev = mod_config["dev"]
+        target = mod_config["target"]
+        build_func = relay.build
+        # Check whether there is a customized build function.
+        if "build" in mod_config and mod_config["build"]:
+            build_func = mod_config["build"]
+
+        mod = build_func(
+            ir_mod,
+            target,
+            params=mod_config["params"],
+            target_host=mod_config["target_host"],
+            mod_name=mod_config["mod_name"],
+        )
+
+        mconf["dev"] = "{},{}".format(dev.device_type, dev.device_id)
+        # Create a pipeline configuration.
+        string_config[mod_idx] = mconf
+        mods[mod] = {"dev": dev}
+
+    return PipelineExecutorFactoryModule(mods, string_config)
+
+
+class PipelineModule(object):
+    """Wrapper of runtime module, caller can use this module to set parameters 
and get outputs.
+
+    Parameters
+    ----------
+    module : PipelineExecutorFactoryModule
+        Common interface for pipeline executor factory modules.
+    """
+
+    def __init__(self, module):
+        self.module = module.module
+
+
+class PipelineConfig(object):
+    """Pipeline configuration information, this class contains the DAG that 
expresses
+    the dependency of each module involved by pipeline and the specific 
parameters
+    of each module build.
+    """
+
+    class Binding:
+        """This class defines the module connections information.
+        The type can only be "input" or "output".
+
+        Parameters
+        ----------
+        owner : ModuleWrapper
+            The class who owns this interface.
+
+        io_type : str
+            The I/O type of this interface. It can only be "input" or "output".
+
+        name : str/integer
+            Name, for input it is string such as "data0", for output it is an 
integer such as 0.
+
+        data_type: TensorType
+            The data type of this interface.
+        """
+
+        def __init__(self, owner, io_type, name, data_type=None):
+            self.io_owner = owner
+            self.io_type = io_type
+            self.name = str(name)
+            # Child interfaces that depend on this interface.
+            self.bindings = []
+            # Parents interfaces that this interface depend on.
+            self.parents = []
+
+            self.data_type = data_type
+
+        def get_name(self):
+            # Return name of this interface and the name of owner who owns 
this interface.
+            owner_name = ""
+            if isinstance(self.io_owner, PipelineConfig.ModuleWrapper):
+                owner_name = self.io_owner.name
+
+            return owner_name, self.name
+
+        def get_owner_idx(self):
+            # If the owner is ModuleWrapper return the owner index, if not 
return 0.
+            if isinstance(self.io_owner, PipelineConfig.ModuleWrapper):
+                return self.io_owner.idx
+
+            return 0
+
+        def is_global_interface(self):
+            """The global interface is the interface visible to the caller 
which use a pipeline
+            executor, the global input interface is responsible for passing 
parameters to the
+            internal module interface, and the global output interface is 
responsible for
+            outputting the results computed by the pipeline executor to a 
caller.
+            """
+            return not isinstance(self.io_owner, PipelineConfig.ModuleWrapper)
+
+        def __repr__(self):
+            # Get all binding information.
+            ret = "  |{}: ".format(self.name)
+            for binding in self.bindings:
+                mname, dname = binding.get_name()
+                ret += "{0}:{1} ".format(mname, dname)
+            return ret
+
+        def check_dag_acyclic(self, start, inputs):
+            """This is to check whether the DAG containing these input 
interfaces is acyclic.
+            Parameters
+            ----------
+            start: ModuleWrapper
+                The starting node of the cycle check algorithm.
+
+            inputs: Binding
+                These interfaces are used to connect to each other to build 
DAG.
+
+            Return
+            ------
+                Return true if there is no cycle in the DAG.
+            """
+            for binding in inputs.values():
+                if start == binding.io_owner:
+                    return False
+                for p in binding.parents:
+                    if not self.check_dag_acyclic(start, 
p.io_owner.input_bindings.bindings):
+                        return False
+
+            return True
+
+        def connect(self, binding):
+            """Connect the current interface to the destination interface.
+            correct connections as following 1. global input connect to module 
input,
+            2. module output connect to global output, 3. module output 
connect to module input
+
+            Parameters
+            ----------
+            binding: Binding
+                The destination of this connection.
+            """
+
+            # Check whether the binding setting is correct or not.
+            if self.io_owner == binding.io_owner:
+                raise RuntimeError(f"Can not bind itself.")
+
+            if not self.is_global_interface() and self.io_type == "input":
+                raise RuntimeError(f"Module can only bind from output 
interface!")
+
+            if (
+                not self.is_global_interface()
+                and not binding.is_global_interface()
+                and binding.io_type == "output"
+            ):
+                raise RuntimeError(f"Can not bind module output with another 
module output!")
+
+            if (
+                not self.is_global_interface()
+                and binding.is_global_interface()
+                and binding.io_type == "input"
+            ):
+                raise RuntimeError(f"Can not bind module output with global 
input!")
+
+            if self.is_global_interface() and self.io_type == "output":
+                raise RuntimeError(f"Global output can not be used as binding 
start point.")
+
+            if self.is_global_interface() and binding.io_type != "input":
+                raise RuntimeError(f"Global input can only bind with module 
input.")
+
+            self.bindings.append(binding)
+            if not self.is_global_interface():
+                # Check whether the data types of the source and destination 
are the same.
+                if (
+                    isinstance(binding.io_owner, PipelineConfig.ModuleWrapper)
+                    and self.data_type != binding.data_type
+                ):
+                    raise RuntimeError(
+                        f"Illegal type (%s vs. %s): binding type is not same!"
+                        % (self.data_type, binding.data_type)
+                    )
+
+                binding.parents.append(self)
+                # Do acyclic check after increase the in-degree.

Review comment:
       Fixed.
   the in-degree count by the length of binding.parents, changed the comments 
into
   "Do acyclic check after increasing the in-degree of child node by setting 
current interface as a parent of the child node"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to