This is an automated email from the ASF dual-hosted git repository.

mousius pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tvm.git


The following commit(s) were added to refs/heads/main by this push:
     new 24f49f1aeb [CI] Apply linting rules to AOT tests (#11657)
24f49f1aeb is described below

commit 24f49f1aeb4643df97ce82a14a0d2c4f55d637f7
Author: Christopher Sidebottom <[email protected]>
AuthorDate: Wed Jun 15 15:11:57 2022 +0100

    [CI] Apply linting rules to AOT tests (#11657)
    
    This enables pylint against the AOT test cases.
    
    One issue I found was with the `tvm.testing.parameter` which breaks the 
naming convention rules in pylint (constants are upper case and function 
parameters are lower case). It may be worth a syntax similar to:
    
    ```
    tvm.testing.parameter("enable_usmp", [True, False])
    ```
---
 tests/lint/pylint.sh                        |   1 +
 tests/python/relay/aot/test_c_device_api.py |  37 ++--
 tests/python/relay/aot/test_cpp_aot.py      |  34 ++--
 tests/python/relay/aot/test_crt_aot.py      | 294 ++++++++++++++++------------
 tests/python/relay/aot/test_crt_aot_usmp.py |  71 ++++---
 5 files changed, 247 insertions(+), 190 deletions(-)

diff --git a/tests/lint/pylint.sh b/tests/lint/pylint.sh
index b442c33c0f..3e55168f26 100755
--- a/tests/lint/pylint.sh
+++ b/tests/lint/pylint.sh
@@ -21,4 +21,5 @@ python3 -m pylint python/tvm --rcfile="$(dirname 
"$0")"/pylintrc
 python3 -m pylint vta/python/vta --rcfile="$(dirname "$0")"/pylintrc
 python3 -m pylint tests/python/unittest/test_tvmscript_type.py 
--rcfile="$(dirname "$0")"/pylintrc
 python3 -m pylint tests/python/contrib/test_cmsisnn --rcfile="$(dirname 
"$0")"/pylintrc
+python3 -m pylint tests/python/relay/aot/*.py --rcfile="$(dirname 
"$0")"/pylintrc
 
diff --git a/tests/python/relay/aot/test_c_device_api.py 
b/tests/python/relay/aot/test_c_device_api.py
index b972b0845c..ea5ea4920c 100644
--- a/tests/python/relay/aot/test_c_device_api.py
+++ b/tests/python/relay/aot/test_c_device_api.py
@@ -14,32 +14,38 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""AOT with C Device API Tests"""
 
-import sys
+import re
 from collections import OrderedDict
 
 import numpy as np
 import pytest
-import re
-import tvm.testing
 
+import tvm.testing
 from tvm import relay
 from tvm.ir.module import IRModule
 from tvm.testing.aot import AOTTestModel, generate_ref_data, compile_models
 from tvm.micro.testing.aot_test_utils import AOT_DEFAULT_RUNNER
 
 
[email protected]
-def device_api_main_func():
[email protected](name="device_api_main_func")
+def fixture_device_api_main_func():
+    """Test function generator which generates C Device API calls"""
+
     # Ideally we should have a sample Target registered here
     # but we're going to re-use this for now
     pytest.importorskip("ethosu.vela")
+
+    # pylint: disable=import-outside-toplevel
     import tensorflow as tf
     import tflite.Model
 
     from tests.python.contrib.test_ethosu.infra import create_test_runner, 
generate_ref_data_tflite
     from tvm.relay.op.contrib.ethosu import partition_for_ethosu
 
+    # pylint: enable=import-outside-toplevel
+
     tf.config.run_functions_eagerly(True)
 
     class Model(tf.Module):
@@ -97,8 +103,9 @@ def device_api_main_func():
     return compile_to_main_func
 
 
[email protected]
-def non_device_api_main_func():
[email protected](name="non_device_api_main_func")
+def fixture_non_device_api_main_func():
+    """Test function generator which does not generate C Device API calls"""
     x = relay.var("x", shape=(10, 10))
     y = relay.var("y", shape=(1, 10))
     func = relay.Function([x, y], relay.multiply(x, y))
@@ -151,7 +158,10 @@ def 
test_device_api_hooks_unpacked_api(device_api_main_func):
     # We dont need to check exact input and output var names in this test.
     # Hence, using a regex to cover any legal I/O name.
     regex = re.compile(
-        'tir\.tvm_check_return\(0, -1, 
tir\.call_extern\("tvmgen_default_ethos_u_main_0", \w+, \w+, 
device_context_ethos_u\)\)'
+        r"tir\.tvm_check_return\("
+        r"0, -1, "
+        r'tir\.call_extern\("tvmgen_default_ethos_u_main_0", '
+        r"\w+, \w+, device_context_ethos_u\)\)"
     )
     assert regex.match(str(main_func.body[1][0][0][1]))
     # Close Device
@@ -171,7 +181,9 @@ def 
test_device_api_hooks_unpacked_api(device_api_main_func):
 
 
 @pytest.mark.skip(
-    "Skipping this test as this is incorrectly using Arm(R) Ethos(TM)-U NPU 
with packed calling convention which is not supported by the NPU codegen's TIR 
to Runtime Hook. We need to use a different target to test this feature"
+    "Skipping this test as this is incorrectly using Arm(R) Ethos(TM)-U NPU "
+    "with packed calling convention which is not supported by the NPU 
codegen's "
+    "TIR to Runtime Hook. We need to use a different target to test this 
feature"
 )
 def test_device_api_hooks_packed_api(device_api_main_func):
     """Check for Device API hooks with packed internal calls"""
@@ -236,11 +248,12 @@ def 
test_without_device_api_packed_api(non_device_api_main_func):
     """Test a graph without the Device API with the packed internal calls"""
 
     main_func = non_device_api_main_func(interface_api="packed", 
use_unpacked_api=False)
+
     assert str(main_func.body) == (
         'tir.tvm_call_cpacked("tvmgen_default_fused_multiply", '
-        "tir.tvm_stack_make_array(x_buffer_var, tir.tvm_stack_make_shape(10, 
10), tir.reinterpret((uint64)0), (uint32)2, float32(0), 0), "
-        "tir.tvm_stack_make_array(y_buffer_var, tir.tvm_stack_make_shape(1, 
10), tir.reinterpret((uint64)0), (uint32)2, float32(0), 0), "
-        "tir.tvm_stack_make_array(output_buffer_var, 
tir.tvm_stack_make_shape(10, 10), tir.reinterpret((uint64)0), (uint32)2, 
float32(0), 0), "
+        "tir.tvm_stack_make_array(x_buffer_var, tir.tvm_stack_make_shape(10, 
10), tir.reinterpret((uint64)0), (uint32)2, float32(0), 0), "  # pylint: 
disable=line-too-long
+        "tir.tvm_stack_make_array(y_buffer_var, tir.tvm_stack_make_shape(1, 
10), tir.reinterpret((uint64)0), (uint32)2, float32(0), 0), "  # pylint: 
disable=line-too-long
+        "tir.tvm_stack_make_array(output_buffer_var, 
tir.tvm_stack_make_shape(10, 10), tir.reinterpret((uint64)0), (uint32)2, 
float32(0), 0), "  # pylint: disable=line-too-long
         "tir.reinterpret((uint64)0))\n"
     )
 
diff --git a/tests/python/relay/aot/test_cpp_aot.py 
b/tests/python/relay/aot/test_cpp_aot.py
index 04a1111e35..742b681ae6 100644
--- a/tests/python/relay/aot/test_cpp_aot.py
+++ b/tests/python/relay/aot/test_cpp_aot.py
@@ -14,10 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+"""AOT with C++ Runtime Tests"""
 
 import re
-import sys
 import textwrap
 
 import numpy as np
@@ -28,13 +27,10 @@ from tvm import IRModule
 from tvm import relay
 from tvm.relay import backend, testing
 from tvm.testing.aot import generate_ref_data
-from tvm.micro.testing.aot_test_utils import AOT_DEFAULT_RUNNER
 
 
 def test_error_c_interface():
-    interface_api = "c"
-    use_unpacked_api = False
-    test_runner = AOT_DEFAULT_RUNNER
+    """Checks that an error occurs when using the packed API in combination 
with C interface"""
 
     two = relay.add(relay.const(1), relay.const(1))
     func = relay.Function([], two)
@@ -53,12 +49,11 @@ def test_error_c_interface():
         )
 
 
-enable_usmp = tvm.testing.parameter(True, False)
-target_kind = tvm.testing.parameter("c", "llvm")
-
-
[email protected]("enable_usmp", [True, False])
[email protected]("target_kind", ["c", "llvm"])
 def test_conv2d(enable_usmp, target_kind):
-    RELAY_MODEL = textwrap.dedent(
+    """Tests compilation of convolutions"""
+    relay_model = textwrap.dedent(
         """\
         #[version = "0.0.5"]
         def @main(%data : Tensor[(1, 3, 64, 64), uint8], %weight : Tensor[(3, 
3, 5, 5), int8]) {
@@ -86,7 +81,7 @@ def test_conv2d(enable_usmp, target_kind):
         }
     """
     )
-    ir_mod = tvm.parser.fromtext(RELAY_MODEL)
+    ir_mod = tvm.parser.fromtext(relay_model)
 
     main_func = ir_mod["main"]
     shape_dict = {p.name_hint: p.checked_type.concrete_shape for p in 
main_func.params}
@@ -119,7 +114,10 @@ def test_conv2d(enable_usmp, target_kind):
     assert (runner.get_output(0).asnumpy() == 
list(ref_outputs.values())[0]).all()
 
 
[email protected]("enable_usmp", [True, False])
[email protected]("target_kind", ["c", "llvm"])
 def test_mobilenet(enable_usmp, target_kind):
+    """Full network test with Mobilenet"""
     ir_mod, params = testing.mobilenet.get_workload(batch_size=1)
     data_shape = [int(x) for x in 
ir_mod["main"].checked_type.arg_types[0].shape]
     data = np.random.uniform(size=data_shape).astype("float32")
@@ -147,10 +145,11 @@ def test_mobilenet(enable_usmp, target_kind):
 
 
 def test_module_list():
-    x = tvm.relay.var("x", tvm.relay.TensorType([1], dtype="float32"))
-    expr = tvm.relay.add(x, tvm.relay.Constant(tvm.nd.array(np.array([1], 
dtype="float32"))))
+    """Checks the correct list of module names is generated"""
+    input_x = tvm.relay.var("x", tvm.relay.TensorType([1], dtype="float32"))
+    expr = tvm.relay.add(input_x, 
tvm.relay.Constant(tvm.nd.array(np.array([1], dtype="float32"))))
     mod = tvm.relay.build(
-        tvm.IRModule.from_expr(tvm.relay.Function([x], expr)),
+        tvm.IRModule.from_expr(tvm.relay.Function([input_x], expr)),
         target="c",
         executor=tvm.relay.backend.Executor("aot", {"interface-api": 
"packed"}),
         mod_name="unusual_module_name_fred",
@@ -177,6 +176,7 @@ def test_create_executor():
 
 
 def test_pass_wrong_device_arg():
+    """Ensure an error is generated if the incorrect number of devices are 
passed"""
     x = tvm.relay.var("x", tvm.relay.TensorType([1], dtype="float32"))
     expr = tvm.relay.add(x, tvm.relay.Constant(tvm.nd.array(np.array([1], 
dtype="float32"))))
     with tvm.transform.PassContext(opt_level=3, 
config={"tir.disable_vectorize": True}):
@@ -191,12 +191,12 @@ def test_pass_wrong_device_arg():
     mod.export_library(test_so_path, cc="gcc", options=["-std=c11"])
     loaded_mod = tvm.runtime.load_module(test_so_path)
 
-    with pytest.raises(tvm.TVMError) as cm:
+    with pytest.raises(tvm.TVMError) as error:
         tvm.runtime.executor.AotModule(loaded_mod["default"](tvm.cpu(0), 
tvm.cpu(0)))
 
         assert (
             "Check failed: devices_.size() == 1 (2 vs. 1) : Expect exactly 1 
device passed."
-            in str(cm.exception)
+            in str(error.exception)
         )
     # TODO write asserts for # and type of device.
 
diff --git a/tests/python/relay/aot/test_crt_aot.py 
b/tests/python/relay/aot/test_crt_aot.py
index f4ef8d7845..1a4f23ad46 100644
--- a/tests/python/relay/aot/test_crt_aot.py
+++ b/tests/python/relay/aot/test_crt_aot.py
@@ -14,11 +14,10 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""AOT with C Runtime Tests"""
 
 from collections import OrderedDict
-import platform
 import re
-import sys
 import os
 import tarfile
 import pathlib
@@ -48,6 +47,7 @@ from tvm.micro.testing.aot_test_utils import 
AOT_DEFAULT_RUNNER, parametrize_aot
 
 
 def test_error_c_interface_with_packed_api():
+    """Checks that an error occurs when using the packed API in combination 
with C interface"""
     interface_api = "c"
     use_unpacked_api = False
     test_runner = AOT_DEFAULT_RUNNER
@@ -75,7 +75,8 @@ def test_error_c_interface_with_packed_api():
 
 @parametrize_aot_options
 def test_conv_with_params(interface_api, use_unpacked_api, test_runner):
-    RELAY_MODEL = """
+    """Tests compilation of convolution with parameters"""
+    relay_model = """
 #[version = "0.0.5"]
 def @main(%data : Tensor[(1, 3, 64, 64), uint8], %weight : Tensor[(8, 3, 5, 
5), int8]) {
     %1 = nn.conv2d(
@@ -90,7 +91,7 @@ def @main(%data : Tensor[(1, 3, 64, 64), uint8], %weight : 
Tensor[(8, 3, 5, 5),
   %1
 }
 """
-    mod = tvm.parser.fromtext(RELAY_MODEL)
+    mod = tvm.parser.fromtext(relay_model)
     main_func = mod["main"]
     shape_dict = {p.name_hint: p.checked_type.concrete_shape for p in 
main_func.params}
     type_dict = {p.name_hint: p.checked_type.dtype for p in main_func.params}
@@ -112,16 +113,17 @@ def @main(%data : Tensor[(1, 3, 64, 64), uint8], %weight 
: Tensor[(8, 3, 5, 5),
 
 @parametrize_aot_options
 def test_add_with_params(interface_api, use_unpacked_api, test_runner):
-    x = relay.var("x", shape=(1, 10))
-    y = relay.var("y", shape=(1, 10))
-    z = relay.add(x, y)
-    func = relay.Function([x, y], z)
+    """Tests compilation of add with parameters"""
+    input_x = relay.var("x", shape=(1, 10))
+    input_y = relay.var("y", shape=(1, 10))
+    input_z = relay.add(input_x, input_y)
+    func = relay.Function([input_x, input_y], input_z)
 
-    x_in = np.ones((1, 10)).astype("float32")
-    y_in = np.random.uniform(size=(1, 10)).astype("float32")
+    input_x_data = np.ones((1, 10)).astype("float32")
+    input_y_data = np.random.uniform(size=(1, 10)).astype("float32")
 
-    params = {"x": x_in}
-    inputs = {"y": y_in}
+    params = {"x": input_x_data}
+    inputs = {"y": input_y_data}
     output_list = generate_ref_data(func, inputs, params)
 
     compile_and_run(
@@ -231,21 +233,23 @@ def test_packed_global_variables():
                     # Collect all functions starting with tvmgen_default
                     tvmgen_funcs += re.findall(r"(?<=).*(?=\()", item)
 
-        # Check if any function name has a packed variable name in all items 
that start with tvmgen_default
+        # Check if any function name has a packed variable name in all
+        # items that start with tvmgen_default
         for func in tvmgen_funcs:
             assert f"{func}_packed" not in tvmgen_names
 
 
 @parametrize_aot_options
 def test_concatenate(interface_api, use_unpacked_api, test_runner):
+    """Tests compilation of concatenate"""
     dtype = "float32"
-    x = relay.var("x", shape=(10, 5), dtype=dtype)
-    y = relay.var("y", shape=(10, 5), dtype=dtype)
-    t = relay.var("z", shape=(), dtype=dtype)
-    z = relay.concatenate((x, y), axis=1)
-    z = relay.add(z, t)
+    input_x = relay.var("x", shape=(10, 5), dtype=dtype)
+    input_y = relay.var("y", shape=(10, 5), dtype=dtype)
+    input_z = relay.var("z", shape=(), dtype=dtype)
+    concat_inputs = relay.concatenate((input_x, input_y), axis=1)
+    func_output = relay.add(input_z, concat_inputs)
     # Check result.
-    func = relay.Function([x, y, t], z)
+    func = relay.Function([input_x, input_y, input_z], func_output)
     x_data = np.random.rand(10, 5).astype(dtype)
     y_data = np.random.rand(10, 5).astype(dtype)
     t_data = np.random.uniform(size=()).astype(dtype)
@@ -262,13 +266,16 @@ def test_concatenate(interface_api, use_unpacked_api, 
test_runner):
 
 @parametrize_aot_options
 def test_nested_tuples(interface_api, use_unpacked_api, test_runner):
-    x = relay.var("x", shape=(10,))
-    x1 = x + relay.const(1.0)
-    x2 = x1 + relay.const(1.0)
-    x3 = x2 + relay.const(1.0)
-    x4 = x3 + relay.const(1.0)
-    out = relay.Tuple([x1, relay.Tuple([relay.Tuple([x2, x3]), x4])])
-    func = relay.Function([x], out)
+    """Tests compilation of functions with nested tuple outputs"""
+    input_x = relay.var("x", shape=(10,))
+    output_1 = input_x + relay.const(1.0)
+    output_2 = output_1 + relay.const(1.0)
+    output_3 = output_2 + relay.const(1.0)
+    output_4 = output_3 + relay.const(1.0)
+    full_output = relay.Tuple(
+        [output_1, relay.Tuple([relay.Tuple([output_2, output_3]), output_4])]
+    )
+    func = relay.Function([input_x], full_output)
 
     x_data = np.random.uniform(size=(10,)).astype(np.float32)
     inputs = {"x": x_data}
@@ -326,7 +333,8 @@ def test_add_const(interface_api, use_unpacked_api, 
test_runner):
 
 
 @parametrize_aot_options
-def test_mul_param(interface_api, use_unpacked_api, test_runner):
+def test_multiply(interface_api, use_unpacked_api, test_runner):
+    """Tests compilation of multiply"""
     x = relay.var("x", shape=(10, 10))
     y = relay.var("y", shape=(1, 10))
     func = relay.Function([x, y], relay.multiply(x, y))
@@ -362,6 +370,7 @@ def test_subtract(interface_api, use_unpacked_api, 
test_runner):
 
 @parametrize_aot_options
 def test_tuple_output(interface_api, use_unpacked_api, test_runner):
+    """Tests getting items from tuples"""
     x = relay.var("x", shape=(6, 9))
     y = relay.split(x, 3).astuple()
     a = relay.TupleGetItem(y, 0)
@@ -383,6 +392,7 @@ def test_tuple_output(interface_api, use_unpacked_api, 
test_runner):
     ["debug_calculated_workspaces", "workspace_byte_alignment"], [(True, 1), 
(True, 16), (False, 1)]
 )
 def test_mobilenet(debug_calculated_workspaces, workspace_byte_alignment):
+    """Full network test with Mobilenet"""
     use_unpacked_api = True
     interface_api = "c"
     test_runner = AOT_DEFAULT_RUNNER
@@ -413,33 +423,36 @@ def test_mobilenet(debug_calculated_workspaces, 
workspace_byte_alignment):
 
 @pytest.mark.parametrize("merge_compiler_regions", [False, True])
 def test_byoc_microtvm(merge_compiler_regions):
-    """This is a simple test to check BYOC capabilities of AOT - with and 
without merging compiler regions to test for 
https://github.com/apache/tvm/issues/9036""";
+    """
+    This is a simple test to check BYOC capabilities of AOT
+    with and without merging compiler regions to test for 
https://github.com/apache/tvm/issues/9036
+    """
     use_unpacked_api = False
     interface_api = "packed"
     test_runner = AOT_DEFAULT_RUNNER
 
-    x = relay.var("x", shape=(10, 10))
-    w0 = relay.var("w0", shape=(10, 10))
-    w1 = relay.var("w1", shape=(10, 10))
+    input_x = relay.var("x", shape=(10, 10))
+    input_w0 = relay.var("w0", shape=(10, 10))
+    input_w1 = relay.var("w1", shape=(10, 10))
 
     # z0 = x + w0
-    x_ = compiler_begin(x, "ccompiler")
-    w0_ = compiler_begin(w0, "ccompiler")
-    z0_ = relay.add(x_, w0_)
-    z0 = compiler_end(z0_, "ccompiler")
+    marked_input_x = compiler_begin(input_x, "ccompiler")
+    marked_input_w0 = compiler_begin(input_w0, "ccompiler")
+    add_x_and_w0 = relay.add(marked_input_x, marked_input_w0)
+    end_inner_add = compiler_end(add_x_and_w0, "ccompiler")
 
     # z1 = z0 + w1
-    z0__ = compiler_begin(z0, "ccompiler")
-    w1_ = compiler_begin(w1, "ccompiler")
-    z1_ = relay.add(z0__, w1_)
-    z1 = compiler_end(z1_, "ccompiler")
+    marked_inner_add = compiler_begin(end_inner_add, "ccompiler")
+    marked_w1 = compiler_begin(input_w1, "ccompiler")
+    add_nested_and_w1 = relay.add(marked_inner_add, marked_w1)
+    end_outer_add = compiler_end(add_nested_and_w1, "ccompiler")
 
     # z2 = z0 + z1
-    z2 = relay.add(z0, z1)
+    final_add = relay.add(end_inner_add, end_outer_add)
 
-    f = relay.Function([x, w0, w1], z2)
+    relay_func = relay.Function([input_x, input_w0, input_w1], final_add)
     mod = tvm.IRModule()
-    mod["main"] = f
+    mod["main"] = relay_func
 
     if merge_compiler_regions:
         mod = transform.MergeCompilerRegions()(mod)
@@ -467,34 +480,37 @@ def 
test_byoc_microtvm_multiple_subgraphs(merge_compiler_regions):
     interface_api = "packed"
     test_runner = AOT_DEFAULT_RUNNER
 
-    x = relay.var("x", shape=(10, 10))
-    w0 = relay.var("w0", shape=(10, 10))
-    w1 = relay.var("w1", shape=(10, 10))
-    w2 = relay.var("w2", shape=(10, 10))
-    w3 = relay.var("w3", shape=(10, 10))
-    w4 = relay.var("w4", shape=(10, 10))
-    w5 = relay.var("w5", shape=(10, 10))
-    w6 = relay.var("w6", shape=(10, 10))
-    w7 = relay.var("w7", shape=(10, 10))
+    input_x = relay.var("x", shape=(10, 10))
+    input_w0 = relay.var("w0", shape=(10, 10))
+    input_w1 = relay.var("w1", shape=(10, 10))
+    input_w2 = relay.var("w2", shape=(10, 10))
+    input_w3 = relay.var("w3", shape=(10, 10))
+    input_w4 = relay.var("w4", shape=(10, 10))
+    input_w5 = relay.var("w5", shape=(10, 10))
+    input_w6 = relay.var("w6", shape=(10, 10))
+    input_w7 = relay.var("w7", shape=(10, 10))
 
     # C compiler
-    z0 = relay.add(x, w0)
-    p0 = relay.subtract(z0, w1)
-    q0 = relay.multiply(p0, w2)
+    ccompiler_add_1 = relay.add(input_x, input_w0)
+    ccompiler_sub_1 = relay.subtract(ccompiler_add_1, input_w1)
+    ccompiler_mul_1 = relay.multiply(ccompiler_sub_1, input_w2)
 
-    z1 = relay.add(x, w3)
-    p1 = relay.subtract(z1, w4)
-    q1 = relay.multiply(p1, w5)
+    ccompiler_add_2 = relay.add(input_x, input_w3)
+    ccompiler_sub_2 = relay.subtract(ccompiler_add_2, input_w4)
+    ccompiler_mul_2 = relay.multiply(ccompiler_sub_2, input_w5)
 
     # Other parts on TVM
-    z2 = relay.add(x, w6)
-    q2 = relay.subtract(z2, w7)
+    tvm_add = relay.add(input_x, input_w6)
+    tvm_sub = relay.subtract(tvm_add, input_w7)
 
-    r = relay.concatenate((q0, q1, q2), axis=0)
-    f = relay.Function([x, w0, w1, w2, w3, w4, w5, w6, w7], r)
+    concat_outputs = relay.concatenate((ccompiler_mul_1, ccompiler_mul_2, 
tvm_sub), axis=0)
+    relay_func = relay.Function(
+        [input_x, input_w0, input_w1, input_w2, input_w3, input_w4, input_w5, 
input_w6, input_w7],
+        concat_outputs,
+    )
     mod = tvm.IRModule()
     ann = byoc.CcompilerAnnotator()
-    mod["main"] = ann.visit(f)
+    mod["main"] = ann.visit(relay_func)
 
     if merge_compiler_regions:
         mod = transform.MergeCompilerRegions()(mod)
@@ -521,22 +537,23 @@ def 
test_byoc_microtvm_multiple_subgraphs(merge_compiler_regions):
 
 @parametrize_aot_options
 def test_add_name_mangling_with_params(interface_api, use_unpacked_api, 
test_runner):
-    x = relay.var("x", shape=(1, 10))
-    y = relay.var("y", shape=(1, 10))
-    z = relay.add(x, y)
-    func = relay.Function([x, y], z)
+    """Checks name mangling works with parameters"""
+    input_x = relay.var("x", shape=(1, 10))
+    input_y = relay.var("y", shape=(1, 10))
+    func_add = relay.add(input_x, input_y)
+    relay_func = relay.Function([input_x, input_y], func_add)
 
     x_in = np.ones((1, 10)).astype("float32")
     y_in = np.random.uniform(size=(1, 10)).astype("float32")
 
     params = {"x": x_in}
     inputs = {"y": y_in}
-    output_list = generate_ref_data(func, inputs, params)
+    output_list = generate_ref_data(relay_func, inputs, params)
 
     compile_and_run(
         AOTTestModel(
             name="my_mod",
-            module=func,
+            module=relay_func,
             inputs=inputs,
             outputs=output_list,
             params=params,
@@ -549,6 +566,7 @@ def test_add_name_mangling_with_params(interface_api, 
use_unpacked_api, test_run
 
 @parametrize_aot_options
 def test_multiple_models(interface_api, use_unpacked_api, test_runner):
+    """Compiles multiple models to ensure both can be compiled into one 
output"""
     # Identity model without params
     x = relay.var("x", "float32")
     mod1 = relay.Function([x], x)
@@ -558,22 +576,23 @@ def test_multiple_models(interface_api, use_unpacked_api, 
test_runner):
     params1 = None
 
     # Convolution model
-    RELAY_MODEL = """
-#[version = "0.0.5"]
-def @main(%data : Tensor[(1, 3, 64, 64), uint8], %weight : Tensor[(8, 3, 5, 
5), int8]) {
-    %1 = nn.conv2d(
-         %data,
-         %weight,
-         padding=[2, 2],
-         channels=8,
-         kernel_size=[5, 5],
-         data_layout="NCHW",
-         kernel_layout="OIHW",
-         out_dtype="int32");
-  %1
-}
-"""
-    mod2 = tvm.parser.fromtext(RELAY_MODEL)
+    relay_model = """
+    #[version = "0.0.5"]
+    def @main(%data : Tensor[(1, 3, 64, 64), uint8], %weight : Tensor[(8, 3, 
5, 5), int8]) {
+        %1 = nn.conv2d(
+            %data,
+            %weight,
+            padding=[2, 2],
+            channels=8,
+            kernel_size=[5, 5],
+            data_layout="NCHW",
+            kernel_layout="OIHW",
+            out_dtype="int32");
+    %1
+    }
+    """
+
+    mod2 = tvm.parser.fromtext(relay_model)
     main_func = mod2["main"]
     shape_dict = {p.name_hint: p.checked_type.concrete_shape for p in 
main_func.params}
     type_dict = {p.name_hint: p.checked_type.dtype for p in main_func.params}
@@ -609,12 +628,14 @@ def @main(%data : Tensor[(1, 3, 64, 64), uint8], %weight 
: Tensor[(8, 3, 5, 5),
 
 
 def test_quant_mobilenet_tfl():
-    """Since in AOT we pass directly the output buffer from the user, in 
quantized networks sharing the output buffers is not possible.
-    This is because the output data type is int8 and the intermediate buffer 
are int32 or int16. We use mobilenet quantized to stress this
+    """Since in AOT we pass directly the output buffer from the user,
+    in quantized networks sharing the output buffers is not possible.
+    This is because the output data type is int8 and the intermediate
+    buffer are int32 or int16. We use mobilenet quantized to stress this
     situation and verify that the output buffer sharing is disabled in AOT."""
     pytest.importorskip("tflite")
 
-    import tvm.relay.testing.tf as tf_testing
+    import tvm.relay.testing.tf as tf_testing  # pylint: 
disable=import-outside-toplevel
 
     use_unpacked_api = True
     interface_api = "c"
@@ -640,22 +661,22 @@ def test_transpose(interface_api, use_unpacked_api, 
test_runner):
     """Test that non-inpleaceable operations (e.g., transpose) do not happen 
in-place."""
 
     dtype = "float32"
-    x = relay.var("x", shape=(10, 5), dtype=dtype)
-    y = relay.var("y", shape=(10, 5), dtype=dtype)
-    t = relay.var("z", shape=(), dtype=dtype)
-    a = relay.add(x, y)
-    b = relay.transpose(a)
-    z = relay.add(b, t)
+    input_x = relay.var("x", shape=(10, 5), dtype=dtype)
+    input_y = relay.var("y", shape=(10, 5), dtype=dtype)
+    input_z = relay.var("z", shape=(), dtype=dtype)
+    first_add = relay.add(input_x, input_y)
+    transpose_add = relay.transpose(first_add)
+    final_add = relay.add(transpose_add, input_z)
     # Check result.
-    func = relay.Function([x, y, t], z)
+    relay_func = relay.Function([input_x, input_y, input_z], final_add)
     x_data = np.random.rand(10, 5).astype(dtype)
     y_data = np.random.rand(10, 5).astype(dtype)
     t_data = np.random.uniform(size=()).astype(dtype)
 
     inputs = {"x": x_data, "y": y_data, "z": t_data}
-    output_list = generate_ref_data(func, inputs)
+    output_list = generate_ref_data(relay_func, inputs)
     compile_and_run(
-        AOTTestModel(module=IRModule.from_expr(func), inputs=inputs, 
outputs=output_list),
+        AOTTestModel(module=IRModule.from_expr(relay_func), inputs=inputs, 
outputs=output_list),
         test_runner,
         interface_api,
         use_unpacked_api,
@@ -693,15 +714,15 @@ def test_name_sanitiser_name_clash():
     test_runner = AOT_DEFAULT_RUNNER
 
     dtype = "float32"
-    x = relay.var("input::-1", shape=(10, 5), dtype=dtype)
+    input_non_clashing = relay.var("input::-1", shape=(10, 5), dtype=dtype)
     # Next 2 input tensor names will clash once sanitized.
-    y = relay.var("input::-2", shape=(10, 5), dtype=dtype)
-    t = relay.var("input:--2", shape=(), dtype=dtype)
-    a = relay.add(x, y)
-    b = relay.transpose(a)
-    z = relay.add(b, t)
+    input_clashing_1 = relay.var("input::-2", shape=(10, 5), dtype=dtype)
+    input_clashing_2 = relay.var("input:--2", shape=(), dtype=dtype)
+    inner_add = relay.add(input_non_clashing, input_clashing_1)
+    transpose_add = relay.transpose(inner_add)
+    final_add = relay.add(transpose_add, input_clashing_2)
     # Check result.
-    func = relay.Function([x, y, t], z)
+    func = relay.Function([input_non_clashing, input_clashing_1, 
input_clashing_2], final_add)
     x_data = np.random.rand(10, 5).astype(dtype)
     y_data = np.random.rand(10, 5).astype(dtype)
     t_data = np.random.uniform(size=()).astype(dtype)
@@ -721,17 +742,17 @@ def test_name_sanitiser_name_clash():
 
 # This tests for deprecated AOT executor arguments
 # TODO(Mousius) Remove deprecated arguments later
-def test_deprecated_target_arguments(capsys):
+def test_deprecated_target_arguments():
     """Tests we can still use relay.build with -executor, -runtime and 
-link-params"""
 
     interface_api = "c"
     use_unpacked_api = True
     test_runner = AOT_DEFAULT_RUNNER
 
-    x = relay.var("x", shape=(1, 10))
-    y = relay.var("y", shape=(1, 10))
-    z = relay.add(x, y)
-    func = relay.Function([x, y], z)
+    input_x = relay.var("x", shape=(1, 10))
+    input_y = relay.var("y", shape=(1, 10))
+    func_add = relay.add(input_x, input_y)
+    func = relay.Function([input_x, input_y], func_add)
 
     x_in = np.ones((1, 10)).astype("float32")
     y_in = np.random.uniform(size=(1, 10)).astype("float32")
@@ -761,6 +782,7 @@ def test_aot_codegen_backend_alloc_workspace_calls():
     # The %data and %weight shapes in the following primitive Relay should 
create
     # small tensors that would get lowered to stack allocations in the CPU 
PrimFuncs.
     # However, the AoT executor codegen should retain them as TVMBAW calls
+    # pylint: disable=line-too-long
     relay_mod = tvm.parser.fromtext(
         """
         #[version = "0.0.5"]
@@ -784,6 +806,8 @@ def test_aot_codegen_backend_alloc_workspace_calls():
         }
         """
     )
+    # pylint: enable=line-too-long
+
     compiled_test_mods = compile_models(
         models=AOTTestModel(module=relay_mod, inputs=None, outputs=None),
         interface_api="c",
@@ -822,10 +846,12 @@ def test_output_tensor_names():
     """Test that the output names generated match those in the model"""
     pytest.importorskip("tflite")
 
-    import os
+    # pylint: disable=import-outside-toplevel
     import tensorflow as tf
     import tflite.Model
 
+    # pylint: enable=import-outside-toplevel
+
     ifm_shape = (1, 299, 299, 3)
     padding = "VALID"
     strides = (1, 1)
@@ -836,38 +862,40 @@ def test_output_tensor_names():
         """Create a model with 2 output tensors"""
 
         class Model(tf.Module):
+            """Simple TFLite test model"""
+
             @tf.function
-            def tf_function(self, x):
-                # Use tf.nn API to create the model
+            def tf_function(self, tf_input_x):
+                """Single TFLite function with two convolutions"""
                 tf_strides = [1, strides[0], strides[1], 1]
                 filter_shape = [kernel_shape[0], kernel_shape[1], 3, 3]
                 filter1 = tf.constant(
                     np.arange(np.prod(filter_shape)).reshape(filter_shape),
                     dtype=tf.float32,
                 )
-                op = tf.nn.conv2d(
-                    x,
+                first_conv2d = tf.nn.conv2d(
+                    tf_input_x,
                     filters=filter1,
                     strides=tf_strides,
                     padding=padding,
                     dilations=dilation,
                 )
-                op = tf.nn.relu(op)
-                # Second convolution
+                first_conv2d = tf.nn.relu(first_conv2d)
+
                 filter2 = tf.constant(
                     1000 + 
np.arange(np.prod(filter_shape)).reshape(filter_shape),
                     dtype=tf.float32,
                 )
-                op2 = tf.nn.conv2d(
-                    x,
+                second_conv2d = tf.nn.conv2d(
+                    tf_input_x,
                     filters=filter2,
                     strides=strides,
                     padding=padding,
                     data_format="NHWC",
                     dilations=dilation,
                 )
-                op2 = tf.nn.relu(op2)
-                return op, op2
+                second_conv2d = tf.nn.relu(second_conv2d)
+                return first_conv2d, second_conv2d
 
         model = Model()
         concrete_func = model.tf_function.get_concrete_function(
@@ -934,6 +962,7 @@ def test_output_tensor_names():
     ],
 )
 def test_workspace_calculation(workspace_byte_alignment, main_workspace_size):
+    """Checks calculated workspace against known values"""
     mod, params = tvm.relay.testing.synthetic.get_workload()
     target = "c"
     runtime = Runtime("crt")
@@ -964,9 +993,12 @@ def test_workspace_calculation_cmsis_nn():
     -hierarchical manner."""
     pytest.importorskip("tflite")
 
+    # pylint: disable=import-outside-toplevel
     from tvm.relay.op.contrib import cmsisnn
     from tvm.contrib.download import download_testdata
 
+    # pylint: enable=import-outside-toplevel
+
     target = "c"
     runtime = Runtime("crt")
     executor = Executor(
@@ -978,7 +1010,11 @@ def test_workspace_calculation_cmsis_nn():
         },
     )
 
-    base_url = 
"https://github.com/ARM-software/ML-zoo/raw/48a22ee22325d15d2371a6df24eb7d67e21dcc97/models/keyword_spotting/cnn_small/tflite_int8";
+    base_url = (
+        "https://github.com/ARM-software/ML-zoo/raw/";
+        "48a22ee22325d15d2371a6df24eb7d67e21dcc97"
+        "/models/keyword_spotting/cnn_small/tflite_int8"
+    )
     file_to_download = "cnn_s_quantized.tflite"
     file_saved = "cnn_s_quantized_15Dec2021.tflite"
     model_file = download_testdata("{}/{}".format(base_url, file_to_download), 
file_saved)
@@ -997,10 +1033,10 @@ def test_workspace_calculation_cmsis_nn():
 
 def test_aot_codegen_checks_returns():
     """This test checks whether AoT lowering creates calls that check the 
return value correctly"""
-    x = relay.var("x", shape=(1, 10))
-    y = relay.var("y", shape=(1, 10))
-    z = relay.add(x, y)
-    func = relay.Function([x, y], z)
+    input_x = relay.var("x", shape=(1, 10))
+    input_y = relay.var("y", shape=(1, 10))
+    func_add = relay.add(input_x, input_y)
+    func = relay.Function([input_x, input_y], func_add)
 
     compiled_test_mods = compile_models(
         models=AOTTestModel(module=IRModule.from_expr(func), inputs=None, 
outputs=None),
@@ -1021,17 +1057,17 @@ def test_aot_codegen_checks_returns():
     )
     # TODO(Mousius) - Create a better place for C codegen tests
     assert (
-        "if (tvmgen_default_fused_add(x_buffer_var, y_buffer_var, 
output_buffer_var) != 0 ) return -1;"
+        "if (tvmgen_default_fused_add(x_buffer_var, y_buffer_var, 
output_buffer_var) != 0 ) return -1;"  # pylint: disable=line-too-long
         in source
     )
 
 
 def test_aot_uses_anf():
     """Checks that A-Normal Form is being used in the AOT lowering pipeline."""
-    x = relay.var("x", shape=(1, 10, 10, 10))
-    y = relay.var("y", shape=(1, 10, 10, 10))
-    z = relay.add(x, y)
-    func = relay.Function([x, y], z)
+    input_x = relay.var("x", shape=(1, 10, 10, 10))
+    input_y = relay.var("y", shape=(1, 10, 10, 10))
+    func_add = relay.add(input_x, input_y)
+    func = relay.Function([input_x, input_y], func_add)
 
     @pass_instrument
     class CheckANFRuns:
diff --git a/tests/python/relay/aot/test_crt_aot_usmp.py 
b/tests/python/relay/aot/test_crt_aot_usmp.py
index 3ede229887..4205b45817 100644
--- a/tests/python/relay/aot/test_crt_aot_usmp.py
+++ b/tests/python/relay/aot/test_crt_aot_usmp.py
@@ -17,17 +17,14 @@
 """ This file contains test that use USMP + AoT using C runtime APIs"""
 
 from collections import OrderedDict
-import sys
 import re
 
 import numpy as np
 import pytest
 
 import tvm
-from tvm import relay, TVMError
-from tvm.ir.module import IRModule
-from tvm.relay import testing, transform
-from tvm.relay.testing import byoc
+from tvm import relay
+from tvm.relay import transform
 from tvm.relay.op.annotation import compiler_begin, compiler_end
 from tvm.relay.backend import Executor, Runtime
 from tvm import WorkspaceMemoryPools, PoolInfo
@@ -47,7 +44,7 @@ from tvm.testing.usmp import 
is_tvm_backendallocworkspace_calls
 
 def _check_for_no_tvm_backendallocworkspace_calls(mod: tvm.runtime.module):
     assert (
-        is_tvm_backendallocworkspace_calls(mod) == False
+        is_tvm_backendallocworkspace_calls(mod) is False
     ), "This is failing because USMP was unable to plan for every tir.allocate 
node."
 
 
@@ -60,6 +57,7 @@ def _check_for_no_tvm_backendallocworkspace_calls(mod: 
tvm.runtime.module):
     ],
 )
 def test_memory_planning(workspace_byte_alignment, main_workspace_size):
+    """Checks calculated workspace against known values"""
     mod, params = tvm.relay.testing.synthetic.get_workload()
     target = "c"
     runtime = Runtime("crt")
@@ -141,33 +139,36 @@ def test_conv2d(interface_api, use_unpacked_api, 
test_runner, groups, weight_sha
 
 @pytest.mark.parametrize("merge_compiler_regions", [False, True])
 def test_byoc_microtvm(merge_compiler_regions):
-    """This is a simple test to check BYOC capabilities of AOT - with and 
without merging compiler regions to test for 
https://github.com/apache/tvm/issues/9036""";
+    """
+    This is a simple test to check BYOC capabilities of AOT
+    with and without merging compiler regions to test for 
https://github.com/apache/tvm/issues/9036
+    """
     use_unpacked_api = False
     interface_api = "packed"
     test_runner = AOTTestRunner(pass_config={"tir.usmp.enable": True})
 
-    x = relay.var("x", shape=(10, 10))
-    w0 = relay.var("w0", shape=(10, 10))
-    w1 = relay.var("w1", shape=(10, 10))
+    input_x = relay.var("x", shape=(10, 10))
+    input_w0 = relay.var("w0", shape=(10, 10))
+    input_w1 = relay.var("w1", shape=(10, 10))
 
     # z0 = x + w0
-    x_ = compiler_begin(x, "ccompiler")
-    w0_ = compiler_begin(w0, "ccompiler")
-    z0_ = relay.add(x_, w0_)
-    z0 = compiler_end(z0_, "ccompiler")
+    marked_input_x = compiler_begin(input_x, "ccompiler")
+    marked_input_w0 = compiler_begin(input_w0, "ccompiler")
+    add_x_and_w0 = relay.add(marked_input_x, marked_input_w0)
+    end_inner_add = compiler_end(add_x_and_w0, "ccompiler")
 
     # z1 = z0 + w1
-    z0__ = compiler_begin(z0, "ccompiler")
-    w1_ = compiler_begin(w1, "ccompiler")
-    z1_ = relay.add(z0__, w1_)
-    z1 = compiler_end(z1_, "ccompiler")
+    marked_inner_add = compiler_begin(end_inner_add, "ccompiler")
+    marked_w1 = compiler_begin(input_w1, "ccompiler")
+    add_nested_and_w1 = relay.add(marked_inner_add, marked_w1)
+    end_outer_add = compiler_end(add_nested_and_w1, "ccompiler")
 
     # z2 = z0 + z1
-    z2 = relay.add(z0, z1)
+    final_add = relay.add(end_inner_add, end_outer_add)
 
-    f = relay.Function([x, w0, w1], z2)
+    relay_func = relay.Function([input_x, input_w0, input_w1], final_add)
     mod = tvm.IRModule()
-    mod["main"] = f
+    mod["main"] = relay_func
 
     if merge_compiler_regions:
         mod = transform.MergeCompilerRegions()(mod)
@@ -199,11 +200,13 @@ def test_byoc_microtvm(merge_compiler_regions):
 
 
 MOBILENET_V1_URL = (
-    
"https://storage.googleapis.com/download.tensorflow.org/models/mobilenet_v1_2018_08_02/mobilenet_v1_1.0_224_quant.tgz";,
+    "https://storage.googleapis.com/download.tensorflow.org/models/";
+    + "mobilenet_v1_2018_08_02/mobilenet_v1_1.0_224_quant.tgz",
     "mobilenet_v1_1.0_224_quant.tflite",
 )
 MOBILENET_V2_URL = (
-    
"https://storage.googleapis.com/download.tensorflow.org/models/tflite_11_05_08/mobilenet_v2_1.0_224_quant.tgz";,
+    "https://storage.googleapis.com/download.tensorflow.org/models/";
+    + "tflite_11_05_08/mobilenet_v2_1.0_224_quant.tgz",
     "mobilenet_v2_1.0_224_quant.tflite",
 )
 
@@ -217,10 +220,13 @@ MOBILENET_V2_URL = (
     ],
 )
 def test_tflite_model_u1_usecase(model_url, usmp_algo, workspace_size):
-    """This checks for ML models and the memory used by them when using USMP 
with different algorithms"""
+    """
+    This checks for ML models and the memory used by them
+    when using USMP with different algorithms
+    """
     pytest.importorskip("tflite")
 
-    import tvm.relay.testing.tf as tf_testing
+    import tvm.relay.testing.tf as tf_testing  # pylint: 
disable=import-outside-toplevel
 
     use_unpacked_api = True
     interface_api = "c"
@@ -287,7 +293,7 @@ def 
test_tflite_model_u3_usecase_single_external_pool(model_url, usmp_algo):
     """This checks for inference with USMP using external pool placed in the 
application"""
     pytest.importorskip("tflite")
 
-    import tvm.relay.testing.tf as tf_testing
+    import tvm.relay.testing.tf as tf_testing  # pylint: 
disable=import-outside-toplevel
 
     use_unpacked_api = True
     interface_api = "c"
@@ -341,7 +347,7 @@ def 
test_tflite_model_u3_usecase_two_external_pools(model_url, usmp_algo):
     """This checks for inference using two external pools placed in the 
application"""
     pytest.importorskip("tflite")
 
-    import tvm.relay.testing.tf as tf_testing
+    import tvm.relay.testing.tf as tf_testing  # pylint: 
disable=import-outside-toplevel
 
     use_unpacked_api = True
     interface_api = "c"
@@ -397,11 +403,11 @@ def 
test_tflite_model_u3_usecase_two_external_pools(model_url, usmp_algo):
         ((MOBILENET_V1_URL, MOBILENET_V2_URL), "greedy_by_size"),
     ],
 )
-def 
test_tflite_model_u2_usecase_two_models_with_a_single_external_pool(model_urls, 
usmp_algo):
+def test_two_models_with_a_single_external_pool(model_urls, usmp_algo):
     """This checks for inference using a single large enough common pool"""
     pytest.importorskip("tflite")
 
-    import tvm.relay.testing.tf as tf_testing
+    import tvm.relay.testing.tf as tf_testing  # pylint: 
disable=import-outside-toplevel
 
     use_unpacked_api = True
     interface_api = "c"
@@ -469,7 +475,7 @@ def 
test_tflite_model_u4_usecase_single_external_pool(model_url, usmp_algo):
     """This checks for inference with USMP using external pool placed in the 
application"""
     pytest.importorskip("tflite")
 
-    import tvm.relay.testing.tf as tf_testing
+    import tvm.relay.testing.tf as tf_testing  # pylint: 
disable=import-outside-toplevel
 
     use_unpacked_api = True
     interface_api = "c"
@@ -538,7 +544,7 @@ def 
test_tflite_model_u4_usecase_two_external_pools(model_url, usmp_algo):
     """This checks for inference with USMP using external pool placed in the 
application"""
     pytest.importorskip("tflite")
 
-    import tvm.relay.testing.tf as tf_testing
+    import tvm.relay.testing.tf as tf_testing  # pylint: 
disable=import-outside-toplevel
 
     use_unpacked_api = True
     interface_api = "c"
@@ -604,7 +610,8 @@ def 
test_tflite_model_u4_usecase_two_external_pools(model_url, usmp_algo):
     )
 
 
-def test_u4_usecase_incompatible_interface_api_errors():
+def test_incompatible_interface_api_errors():
+    """Ensures an error is thrown if not using the C interface API"""
     mod, params = tvm.relay.testing.synthetic.get_workload()
     target = "c"
     runtime = Runtime("crt")

Reply via email to