zhaoyang-star commented on PR #11334:
URL: https://github.com/apache/tvm/pull/11334#issuecomment-1283603687

   Hi @huajsj , Thanks for your contribution. I recently tried to use Pipeline 
Executor to optimize my inference. Based on `test_pipeline_executor.py` I tried 
to split ResNet18 to 3 subgraphs and verify the result compared to the normal 
way. But the result of pipeline diff with the normal way. 
   ```shell
           np.testing.assert_allclose(actual.shape, desired.shape)
   >       np.testing.assert_allclose(actual, desired, rtol=rtol, atol=atol, 
verbose=True)
   E       AssertionError: 
   E       Not equal to tolerance rtol=1e-07, atol=1e-07
   E       
   E       Mismatched elements: 1000 / 1000 (100%)
   E       Max absolute difference: 0.0008707
   E       Max relative difference: 0.7465138
   E        x: array([[0.000775, 0.001194, 0.000832, 0.001083, 0.00134 , 
0.001236,
   E               0.001081, 0.00083 , 0.001295, 0.001404, 0.000695, 0.000878,
   E               0.000933, 0.000871, 0.00145 , 0.000928, 0.001229, 
0.000944,...
   E        y: array([[0.000947, 0.00105 , 0.000971, 0.001059, 0.001042, 
0.001014,
   E               0.001078, 0.000846, 0.001069, 0.000998, 0.000913, 0.001075,
   E               0.000989, 0.000885, 0.00121 , 0.000972, 0.001071, 
0.000932,...
   
   ```
   I have checked the subgraphs and found they are all right. Could you please 
help to find the reason of difference between pipeline way and the normal way? 
Thanks for your kind help.
   
   Please run `apt-get install graphvis` first for visulize subgraph.
   
   ```python3
   # Licensed to the Apache Software Foundation (ASF) under one
   # or more contributor license agreements.  See the NOTICE file
   # distributed with this work for additional information
   # regarding copyright ownership.  The ASF licenses this file
   # to you under the Apache License, Version 2.0 (the
   # "License"); you may not use this file except in compliance
   # with the License.  You may obtain a copy of the License at
   #
   #   http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing,
   # software distributed under the License is distributed on an
   # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   # KIND, either express or implied.  See the License for the
   # specific language governing permissions and limitations
   # under the License.
   
   import pytest
   import os
   import time
   import numpy as np
   import tvm
   import tvm.testing
   from tvm import relay
   from tvm.relay import transform, build_module
   from tvm.contrib import graph_executor, pipeline_executor, 
pipeline_executor_build
   from tvm._ffi import get_global_func
   from tvm.contrib import cc as _cc
   import inspect
   tutorial_dir = os.path.dirname(inspect.getfile(lambda: None))
   os.sys.path.append(os.path.join(tutorial_dir, "../../../tests/python/relay"))
   from test_pipeline_executor import graph_split, reset_cpu_affinity
   from tvm.contrib import relay_viz
   
   
   graph_attr = {"color": "red"}
   node_attr = {"color": "blue"}
   edge_attr = {"color": "black"}
   # VizNode is passed to the callback.
   def get_node_attr(node):
       if "nn.conv2d" in node.type_name and "NCHW" in node.detail:
           return {
               "fillcolor": "green",
               "style": "filled",
               "shape": "box",
           }
       if "Var" in node.type_name:
           return {"shape": "ellipse"}
       return {"shape": "box"}
   
   
   dot_plotter = relay_viz.DotPlotter(
       graph_attr=graph_attr,
       node_attr=node_attr,
       edge_attr=edge_attr,
       get_node_attr=get_node_attr)
   
   
   def get_split_mod():
       mod, params = tvm.relay.testing.resnet.get_workload(
           batch_size=1, num_classes=1000, num_layers=18)
       print("Outputing mod.pdf ...")
       viz = relay_viz.RelayVisualizer(
           mod,
           plotter=dot_plotter,
           parser=relay_viz.DotVizParser())
       viz.render("mod")
       split_conf = [{"op_name": "add", "op_index": 1}, {"op_name": "add", 
"op_index": 4}]
       mods = graph_split(mod["main"], split_conf, params)
       return mods, params, (1, 3, 224, 224), mod
   
   
   def build_relay(mod, params):
       dev = tvm.device("llvm", 0)
       with tvm.transform.PassContext(opt_level=3):
           lib = relay.build_module.build(mod, "llvm", params=params)
       module = graph_executor.GraphModule(lib["default"](dev))
       return module
   
   
   def run_relay(module, in_data):
       module.set_input("data", in_data)
       module.run()
       return module.get_output(0).numpy()
   
   
   def test_pipeline():
       if pipeline_executor_build.pipeline_executor_build_enabled():
           target_list = tvm.testing.enabled_targets()
           print(f"target_list:{target_list}")
           for target in target_list:
               affinity = os.sched_getaffinity(0)
               # Get the three pipeline modules here.
               (mod1, mod2, mod3), params, dshape, ori_mod = get_split_mod()
   
               print("Outputing mod1.pdf ...")
               viz = relay_viz.RelayVisualizer(
                   mod1,
                   plotter=dot_plotter,
                   parser=relay_viz.DotVizParser())
               viz.render("mod1")
               print("Outputing mod2.pdf ...")
               viz = relay_viz.RelayVisualizer(
                   mod2,
                   plotter=dot_plotter,
                   parser=relay_viz.DotVizParser())
               viz.render("mod2")
               print("Outputing mod3.pdf ...")
               viz = relay_viz.RelayVisualizer(
                   mod3,
                   plotter=dot_plotter,
                   parser=relay_viz.DotVizParser())
               viz.render("mod3")
   
               # Prepare batch data for pipeline computation.
               datas = []
               for i in range(5):
                   datas.append(np.full(dshape, 3 + i).astype("float32"))
   
               pipe_config = pipeline_executor_build.PipelineConfig()
   
               # The pipeline input named "data_a" will be connected to a input 
named "data"
               # of mod1.
               
pipe_config["input"]["data_a"].connect(pipe_config[mod1]["input"]["data"])
   
               # The mod1 output[0] will be connected to a input named 
"data_n_0" of mod2.
               
pipe_config[mod1]["output"][0].connect(pipe_config[mod2]["input"]["data_n_0"])
   
               # The mod2 output[2] will be connected to a input named 
"data_n_1" of mod3.
               
pipe_config[mod2]["output"][0].connect(pipe_config[mod3]["input"]["data_n_1"])
   
               # The mod3 output[0] will be connected to pipeline output[0].
               
pipe_config[mod3]["output"][0].connect(pipe_config["output"]["0"])
               print(f"pipe_config:\n {pipe_config}")
               # Print configuration (print(pipe_config)), the result looks 
like following.
               #
               # Params
               #
               # Inputs
               #   |data_a: mod0:data
               #
               # output
               #   |output(0) : mod2.output(0)
               #
               # connections
               #   |mod0.output(0)-> mod1.data_n_0
               #   |mod1.output(0)-> mod2.data_n_1
   
               # Set other parameters.
               pipe_config[mod1].target = target[0]
               pipe_config[mod1].dev = target[1]
               pipe_config[mod1].cpu_affinity = "0"
               pipe_config[mod1].fcompile = _cc.create_shared
   
               pipe_config[mod2].target = "llvm"
               pipe_config[mod2].dev = tvm.cpu(0)
               pipe_config[mod2].cpu_affinity = "1"
   
               pipe_config[mod3].target = "llvm"
               pipe_config[mod3].dev = tvm.cpu(0)
               pipe_config[mod3].cpu_affinity = "2"
               # Checking the configuration of modules dependency.
               mconfig = pipe_config.get_config()
               with open('module_connection.config', 'w') as f:
                   f.write(f'module_connection 
config:\n{mconfig["module_connection"]}')
   
               # Build and create a pipeline module.
               with tvm.transform.PassContext(opt_level=3):
                   pipeline_mod_factory = 
pipeline_executor_build.build(pipe_config)
   
               # Export the parameter configuration to a file.
               directory_path = tvm.contrib.utils.tempdir().temp_dir
               # If the directory does not exist, create it.
               if not os.path.exists(directory_path):
                   os.makedirs(directory_path)
               config_file_name = 
pipeline_mod_factory.export_library(directory_path)
   
               # Use the output of build to create and initialize 
PipelineModule.
               pipeline_module = 
pipeline_executor.PipelineModule(pipeline_mod_factory)
               assert pipeline_module
   
               # Use the import function to create and initialize 
PipelineModule.
               pipeline_module_test = 
pipeline_executor.PipelineModule.load_library(config_file_name)
               assert pipeline_module_test.num_outputs == 1
   
               input_map = pipeline_module_test.get_input_pipeline_map("data_a")
               assert input_map[0] == "0" and input_map[1] == "data"
   
               # Build the original mod
               normal_outputs = []
               module = build_relay(ori_mod, params)
   
               for round in range(0, len(datas)):
                   data = datas[round]
                   # Setting the input data into the pipeline executor.
                   pipeline_module_test.set_input("data_a", tvm.nd.array(data))
                   input_map = 
pipeline_module_test.get_input_pipeline_map("data_a")
                   # Checking whether the input setting of the first runtime is 
successful.
                   # The input of the rest of runtime will go into a queue and 
we can not check
                   # these input data here.
                   if input_map[0] == "0":
                       input_data = pipeline_module_test.get_input("data_a")
                       tvm.testing.assert_allclose(data, input_data.numpy())
   
                   assert pipeline_module_test.num_inputs == 1
                   # Running the pipeline executor in the pipeline mode.
                   pipeline_module_test.run()
                   # Running the original mod.
                   normal_outputs.append(run_relay(module, data))
   
               for k in range(0, len(datas)):
                   statistic_time = 0
                   outputs = pipeline_module_test.get_output()
                   while len(outputs) == 0:
                       outputs = pipeline_module_test.get_output()
                       statistic_time = statistic_time + 1
                       # Setting the timeout to 10 seconds.
                       assert statistic_time < 5
                       time.sleep(1)
   
                   # np.savetxt("baseline.out", normal_outputs[k])
                   # np.savetxt("pipeline.out", outputs[0].numpy())
                   for i in range(len(outputs)):
                       tvm.testing.assert_allclose(normal_outputs[i], 
outputs[i].numpy())
   
                       assert pipeline_module_test.num_executing_pipeline == 
round + 1
   
               # Reset the cpu affinity after a test.
               # reset_cpu_affinity(affinity)
   
   
   if __name__ == "__main__":
       pytest.main([__file__])
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to