[ 
https://issues.apache.org/jira/browse/SPARK-26364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720858#comment-16720858
 ] 

ASF GitHub Bot commented on SPARK-26364:
----------------------------------------

asfgit closed pull request #23314: [SPARK-26364][PYTHON][TESTING] Clean up 
imports in test_pandas_udf*
URL: https://github.com/apache/spark/pull/23314
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/python/pyspark/sql/tests/test_pandas_udf.py 
b/python/pyspark/sql/tests/test_pandas_udf.py
index c4b5478a7e893..d4d9679649ee9 100644
--- a/python/pyspark/sql/tests/test_pandas_udf.py
+++ b/python/pyspark/sql/tests/test_pandas_udf.py
@@ -17,12 +17,16 @@
 
 import unittest
 
+from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
 from pyspark.sql.types import *
 from pyspark.sql.utils import ParseException
+from pyspark.rdd import PythonEvalType
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, 
have_pyarrow, \
     pandas_requirement_message, pyarrow_requirement_message
 from pyspark.testing.utils import QuietTest
 
+from py4j.protocol import Py4JJavaError
+
 
 @unittest.skipIf(
     not have_pandas or not have_pyarrow,
@@ -30,9 +34,6 @@
 class PandasUDFTests(ReusedSQLTestCase):
 
     def test_pandas_udf_basic(self):
-        from pyspark.rdd import PythonEvalType
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         udf = pandas_udf(lambda x: x, DoubleType())
         self.assertEqual(udf.returnType, DoubleType())
         self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
@@ -65,10 +66,6 @@ def test_pandas_udf_basic(self):
         self.assertEqual(udf.evalType, 
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
     def test_pandas_udf_decorator(self):
-        from pyspark.rdd import PythonEvalType
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-        from pyspark.sql.types import StructType, StructField, DoubleType
-
         @pandas_udf(DoubleType())
         def foo(x):
             return x
@@ -114,8 +111,6 @@ def foo(x):
         self.assertEqual(foo.evalType, 
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
     def test_udf_wrong_arg(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         with QuietTest(self.sc):
             with self.assertRaises(ParseException):
                 @pandas_udf('blah')
@@ -151,9 +146,6 @@ def foo(k, v, w):
                     return k
 
     def test_stopiteration_in_udf(self):
-        from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
-        from py4j.protocol import Py4JJavaError
-
         def foo(x):
             raise StopIteration()
 
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py 
b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
index 5383704434c85..18264ead2fd08 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
@@ -17,6 +17,9 @@
 
 import unittest
 
+from pyspark.rdd import PythonEvalType
+from pyspark.sql.functions import array, explode, col, lit, mean, sum, \
+    udf, pandas_udf, PandasUDFType
 from pyspark.sql.types import *
 from pyspark.sql.utils import AnalysisException
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, 
have_pyarrow, \
@@ -31,7 +34,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def data(self):
-        from pyspark.sql.functions import array, explode, col, lit
         return self.spark.range(10).toDF('id') \
             .withColumn("vs", array([lit(i * 1.0) + col('id') for i in 
range(20, 30)])) \
             .withColumn("v", explode(col('vs'))) \
@@ -40,8 +42,6 @@ def data(self):
 
     @property
     def python_plus_one(self):
-        from pyspark.sql.functions import udf
-
         @udf('double')
         def plus_one(v):
             assert isinstance(v, (int, float))
@@ -51,7 +51,6 @@ def plus_one(v):
     @property
     def pandas_scalar_plus_two(self):
         import pandas as pd
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         @pandas_udf('double', PandasUDFType.SCALAR)
         def plus_two(v):
@@ -61,8 +60,6 @@ def plus_two(v):
 
     @property
     def pandas_agg_mean_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def avg(v):
             return v.mean()
@@ -70,8 +67,6 @@ def avg(v):
 
     @property
     def pandas_agg_sum_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def sum(v):
             return v.sum()
@@ -80,7 +75,6 @@ def sum(v):
     @property
     def pandas_agg_weighted_mean_udf(self):
         import numpy as np
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def weighted_mean(v, w):
@@ -88,8 +82,6 @@ def weighted_mean(v, w):
         return weighted_mean
 
     def test_manual(self):
-        from pyspark.sql.functions import pandas_udf, array
-
         df = self.data
         sum_udf = self.pandas_agg_sum_udf
         mean_udf = self.pandas_agg_mean_udf
@@ -118,8 +110,6 @@ def test_manual(self):
         self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
     def test_basic(self):
-        from pyspark.sql.functions import col, lit, mean
-
         df = self.data
         weighted_mean_udf = self.pandas_agg_weighted_mean_udf
 
@@ -150,9 +140,6 @@ def test_basic(self):
         self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
 
     def test_unsupported_types(self):
-        from pyspark.sql.types import DoubleType, MapType
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(NotImplementedError, 'not supported'):
                 pandas_udf(
@@ -173,8 +160,6 @@ def mean_and_std_udf(v):
                     return {v.mean(): v.std()}
 
     def test_alias(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         mean_udf = self.pandas_agg_mean_udf
 
@@ -187,8 +172,6 @@ def test_mixed_sql(self):
         """
         Test mixing group aggregate pandas UDF with sql expression.
         """
-        from pyspark.sql.functions import sum
-
         df = self.data
         sum_udf = self.pandas_agg_sum_udf
 
@@ -225,8 +208,6 @@ def test_mixed_udfs(self):
         """
         Test mixing group aggregate pandas UDF with python UDF and scalar 
pandas UDF.
         """
-        from pyspark.sql.functions import sum
-
         df = self.data
         plus_one = self.python_plus_one
         plus_two = self.pandas_scalar_plus_two
@@ -292,8 +273,6 @@ def test_multiple_udfs(self):
         """
         Test multiple group aggregate pandas UDFs in one agg function.
         """
-        from pyspark.sql.functions import sum, mean
-
         df = self.data
         mean_udf = self.pandas_agg_mean_udf
         sum_udf = self.pandas_agg_sum_udf
@@ -315,8 +294,6 @@ def test_multiple_udfs(self):
         self.assertPandasEqual(expected1, result1)
 
     def test_complex_groupby(self):
-        from pyspark.sql.functions import sum
-
         df = self.data
         sum_udf = self.pandas_agg_sum_udf
         plus_one = self.python_plus_one
@@ -359,8 +336,6 @@ def test_complex_groupby(self):
         self.assertPandasEqual(expected7.toPandas(), result7.toPandas())
 
     def test_complex_expressions(self):
-        from pyspark.sql.functions import col, sum
-
         df = self.data
         plus_one = self.python_plus_one
         plus_two = self.pandas_scalar_plus_two
@@ -434,7 +409,6 @@ def test_complex_expressions(self):
         self.assertPandasEqual(expected3, result3)
 
     def test_retain_group_columns(self):
-        from pyspark.sql.functions import sum
         with self.sql_conf({"spark.sql.retainGroupColumns": False}):
             df = self.data
             sum_udf = self.pandas_agg_sum_udf
@@ -444,8 +418,6 @@ def test_retain_group_columns(self):
             self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
     def test_array_type(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         df = self.data
 
         array_udf = pandas_udf(lambda x: [1.0, 2.0], 'array<double>', 
PandasUDFType.GROUPED_AGG)
@@ -453,8 +425,6 @@ def test_array_type(self):
         self.assertEquals(result1.first()['v2'], [1.0, 2.0])
 
     def test_invalid_args(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         plus_one = self.python_plus_one
         mean_udf = self.pandas_agg_mean_udf
@@ -478,9 +448,6 @@ def test_invalid_args(self):
                 df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
 
     def test_register_vectorized_udf_basic(self):
-        from pyspark.sql.functions import pandas_udf
-        from pyspark.rdd import PythonEvalType
-
         sum_pandas_udf = pandas_udf(
             lambda v: v.sum(), "integer", 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
 
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py 
b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
index a12c608dff9dd..80e70349b78d3 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
@@ -18,7 +18,12 @@
 import datetime
 import unittest
 
+from collections import OrderedDict
+from decimal import Decimal
+from distutils.version import LooseVersion
+
 from pyspark.sql import Row
+from pyspark.sql.functions import array, explode, col, lit, udf, sum, 
pandas_udf, PandasUDFType
 from pyspark.sql.types import *
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, 
have_pyarrow, \
     pandas_requirement_message, pyarrow_requirement_message
@@ -32,16 +37,12 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def data(self):
-        from pyspark.sql.functions import array, explode, col, lit
         return self.spark.range(10).toDF('id') \
             .withColumn("vs", array([lit(i) for i in range(20, 30)])) \
             .withColumn("v", explode(col('vs'))).drop('vs')
 
     def test_supported_types(self):
-        from decimal import Decimal
-        from distutils.version import LooseVersion
         import pyarrow as pa
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         values = [
             1, 2, 3,
@@ -131,8 +132,6 @@ def test_supported_types(self):
         self.assertPandasEqual(expected3, result3)
 
     def test_array_type_correct(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType, array, col
-
         df = self.data.withColumn("arr", array(col("id"))).repartition(1, "id")
 
         output_schema = StructType(
@@ -151,8 +150,6 @@ def test_array_type_correct(self):
         self.assertPandasEqual(expected, result)
 
     def test_register_grouped_map_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(
@@ -161,7 +158,6 @@ def test_register_grouped_map_udf(self):
                 self.spark.catalog.registerFunction("foo_udf", foo_udf)
 
     def test_decorator(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
         df = self.data
 
         @pandas_udf(
@@ -176,7 +172,6 @@ def foo(pdf):
         self.assertPandasEqual(expected, result)
 
     def test_coerce(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
         df = self.data
 
         foo = pandas_udf(
@@ -191,7 +186,6 @@ def test_coerce(self):
         self.assertPandasEqual(expected, result)
 
     def test_complex_groupby(self):
-        from pyspark.sql.functions import pandas_udf, col, PandasUDFType
         df = self.data
 
         @pandas_udf(
@@ -210,7 +204,6 @@ def normalize(pdf):
         self.assertPandasEqual(expected, result)
 
     def test_empty_groupby(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
         df = self.data
 
         @pandas_udf(
@@ -229,7 +222,6 @@ def normalize(pdf):
         self.assertPandasEqual(expected, result)
 
     def test_datatype_string(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
         df = self.data
 
         foo_udf = pandas_udf(
@@ -243,8 +235,6 @@ def test_datatype_string(self):
         self.assertPandasEqual(expected, result)
 
     def test_wrong_return_type(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(
                     NotImplementedError,
@@ -255,7 +245,6 @@ def test_wrong_return_type(self):
                     PandasUDFType.GROUPED_MAP)
 
     def test_wrong_args(self):
-        from pyspark.sql.functions import udf, pandas_udf, sum, PandasUDFType
         df = self.data
 
         with QuietTest(self.sc):
@@ -277,9 +266,7 @@ def test_wrong_args(self):
                     pandas_udf(lambda x, y: x, DoubleType(), 
PandasUDFType.SCALAR))
 
     def test_unsupported_types(self):
-        from distutils.version import LooseVersion
         import pyarrow as pa
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         common_err_msg = 'Invalid returnType.*grouped map Pandas UDF.*'
         unsupported_types = [
@@ -300,7 +287,6 @@ def test_unsupported_types(self):
 
     # Regression test for SPARK-23314
     def test_timestamp_dst(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
         # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 
am
         dt = [datetime.datetime(2015, 11, 1, 0, 30),
               datetime.datetime(2015, 11, 1, 1, 30),
@@ -311,12 +297,12 @@ def test_timestamp_dst(self):
         self.assertPandasEqual(df.toPandas(), result.toPandas())
 
     def test_udf_with_key(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
+        import numpy as np
+
         df = self.data
         pdf = df.toPandas()
 
         def foo1(key, pdf):
-            import numpy as np
             assert type(key) == tuple
             assert type(key[0]) == np.int64
 
@@ -326,7 +312,6 @@ def foo1(key, pdf):
                               v4=pdf.v * pdf.id.mean())
 
         def foo2(key, pdf):
-            import numpy as np
             assert type(key) == tuple
             assert type(key[0]) == np.int64
             assert type(key[1]) == np.int32
@@ -385,9 +370,7 @@ def foo3(key, pdf):
         self.assertPandasEqual(expected4, result4)
 
     def test_column_order(self):
-        from collections import OrderedDict
         import pandas as pd
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         # Helper function to set column names from a list
         def rename_pdf(pdf, names):
@@ -468,7 +451,6 @@ def invalid_positional_types(pdf):
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
                 grouped_df.apply(column_name_typo).collect()
-            from distutils.version import LooseVersion
             import pyarrow as pa
             if LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
                 # TODO: see ARROW-1949. Remove when the minimum PyArrow 
version becomes 0.11.0.
@@ -480,7 +462,6 @@ def invalid_positional_types(pdf):
 
     def test_positional_assignment_conf(self):
         import pandas as pd
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         with self.sql_conf({
                 
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}):
@@ -496,9 +477,7 @@ def foo(_):
                 self.assertEqual(r.b, 1)
 
     def test_self_join_with_pandas(self):
-        import pyspark.sql.functions as F
-
-        @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP)
+        @pandas_udf('key long, col string', PandasUDFType.GROUPED_MAP)
         def dummy_pandas_udf(df):
             return df[['key', 'col']]
 
@@ -508,12 +487,11 @@ def dummy_pandas_udf(df):
 
         # this was throwing an AnalysisException before SPARK-24208
         res = df_with_pandas.alias('temp0').join(df_with_pandas.alias('temp1'),
-                                                 F.col('temp0.key') == 
F.col('temp1.key'))
+                                                 col('temp0.key') == 
col('temp1.key'))
         self.assertEquals(res.count(), 5)
 
     def test_mixed_scalar_udfs_followed_by_grouby_apply(self):
         import pandas as pd
-        from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
 
         df = self.spark.range(0, 10).toDF('v1')
         df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) \
diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index 2f585a3725988..6a6865a9fb16d 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -16,12 +16,20 @@
 #
 import datetime
 import os
+import random
 import shutil
 import sys
 import tempfile
 import time
 import unittest
 
+from datetime import date, datetime
+from decimal import Decimal
+from distutils.version import LooseVersion
+
+from pyspark.rdd import PythonEvalType
+from pyspark.sql import Column
+from pyspark.sql.functions import array, col, expr, lit, sum, udf, pandas_udf
 from pyspark.sql.types import Row
 from pyspark.sql.types import *
 from pyspark.sql.utils import AnalysisException
@@ -59,18 +67,16 @@ def tearDownClass(cls):
 
     @property
     def nondeterministic_vectorized_udf(self):
-        from pyspark.sql.functions import pandas_udf
+        import pandas as pd
+        import numpy as np
 
         @pandas_udf('double')
         def random_udf(v):
-            import pandas as pd
-            import numpy as np
             return pd.Series(np.random.random(len(v)))
         random_udf = random_udf.asNondeterministic()
         return random_udf
 
     def test_pandas_udf_tokenize(self):
-        from pyspark.sql.functions import pandas_udf
         tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')),
                               ArrayType(StringType()))
         self.assertEqual(tokenize.returnType, ArrayType(StringType()))
@@ -79,7 +85,6 @@ def test_pandas_udf_tokenize(self):
         self.assertEqual([Row(hi=[u'hi', u'boo']), Row(hi=[u'bye', u'boo'])], 
result.collect())
 
     def test_pandas_udf_nested_arrays(self):
-        from pyspark.sql.functions import pandas_udf
         tokenize = pandas_udf(lambda s: s.apply(lambda str: [str.split(' ')]),
                               ArrayType(ArrayType(StringType())))
         self.assertEqual(tokenize.returnType, 
ArrayType(ArrayType(StringType())))
@@ -88,7 +93,6 @@ def test_pandas_udf_nested_arrays(self):
         self.assertEqual([Row(hi=[[u'hi', u'boo']]), Row(hi=[[u'bye', 
u'boo']])], result.collect())
 
     def test_vectorized_udf_basic(self):
-        from pyspark.sql.functions import pandas_udf, col, array
         df = self.spark.range(10).select(
             col('id').cast('string').alias('str'),
             col('id').cast('int').alias('int'),
@@ -114,9 +118,6 @@ def test_vectorized_udf_basic(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_register_nondeterministic_vectorized_udf_basic(self):
-        from pyspark.sql.functions import pandas_udf
-        from pyspark.rdd import PythonEvalType
-        import random
         random_pandas_udf = pandas_udf(
             lambda x: random.randint(6, 6) + x, 
IntegerType()).asNondeterministic()
         self.assertEqual(random_pandas_udf.deterministic, False)
@@ -129,7 +130,6 @@ def 
test_register_nondeterministic_vectorized_udf_basic(self):
         self.assertEqual(row[0], 7)
 
     def test_vectorized_udf_null_boolean(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(True,), (True,), (None,), (False,)]
         schema = StructType().add("bool", BooleanType())
         df = self.spark.createDataFrame(data, schema)
@@ -138,7 +138,6 @@ def test_vectorized_udf_null_boolean(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_byte(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("byte", ByteType())
         df = self.spark.createDataFrame(data, schema)
@@ -147,7 +146,6 @@ def test_vectorized_udf_null_byte(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_short(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("short", ShortType())
         df = self.spark.createDataFrame(data, schema)
@@ -156,7 +154,6 @@ def test_vectorized_udf_null_short(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_int(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("int", IntegerType())
         df = self.spark.createDataFrame(data, schema)
@@ -165,7 +162,6 @@ def test_vectorized_udf_null_int(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_long(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("long", LongType())
         df = self.spark.createDataFrame(data, schema)
@@ -174,7 +170,6 @@ def test_vectorized_udf_null_long(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_float(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(3.0,), (5.0,), (-1.0,), (None,)]
         schema = StructType().add("float", FloatType())
         df = self.spark.createDataFrame(data, schema)
@@ -183,7 +178,6 @@ def test_vectorized_udf_null_float(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_double(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(3.0,), (5.0,), (-1.0,), (None,)]
         schema = StructType().add("double", DoubleType())
         df = self.spark.createDataFrame(data, schema)
@@ -192,8 +186,6 @@ def test_vectorized_udf_null_double(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_decimal(self):
-        from decimal import Decimal
-        from pyspark.sql.functions import pandas_udf, col
         data = [(Decimal(3.0),), (Decimal(5.0),), (Decimal(-1.0),), (None,)]
         schema = StructType().add("decimal", DecimalType(38, 18))
         df = self.spark.createDataFrame(data, schema)
@@ -202,7 +194,6 @@ def test_vectorized_udf_null_decimal(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_string(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [("foo",), (None,), ("bar",), ("bar",)]
         schema = StructType().add("str", StringType())
         df = self.spark.createDataFrame(data, schema)
@@ -211,7 +202,6 @@ def test_vectorized_udf_null_string(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_string_in_udf(self):
-        from pyspark.sql.functions import pandas_udf, col
         import pandas as pd
         df = self.spark.range(10)
         str_f = pandas_udf(lambda x: pd.Series(map(str, x)), StringType())
@@ -220,7 +210,6 @@ def test_vectorized_udf_string_in_udf(self):
         self.assertEquals(expected.collect(), actual.collect())
 
     def test_vectorized_udf_datatype_string(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10).select(
             col('id').cast('string').alias('str'),
             col('id').cast('int').alias('int'),
@@ -244,9 +233,8 @@ def test_vectorized_udf_datatype_string(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_binary(self):
-        from distutils.version import LooseVersion
         import pyarrow as pa
-        from pyspark.sql.functions import pandas_udf, col
+
         if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
             with QuietTest(self.sc):
                 with self.assertRaisesRegexp(
@@ -262,7 +250,6 @@ def test_vectorized_udf_null_binary(self):
             self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_array_type(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [([1, 2],), ([3, 4],)]
         array_schema = StructType([StructField("array", 
ArrayType(IntegerType()))])
         df = self.spark.createDataFrame(data, schema=array_schema)
@@ -271,7 +258,6 @@ def test_vectorized_udf_array_type(self):
         self.assertEquals(df.collect(), result.collect())
 
     def test_vectorized_udf_null_array(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [([1, 2],), (None,), (None,), ([3, 4],), (None,)]
         array_schema = StructType([StructField("array", 
ArrayType(IntegerType()))])
         df = self.spark.createDataFrame(data, schema=array_schema)
@@ -280,7 +266,6 @@ def test_vectorized_udf_null_array(self):
         self.assertEquals(df.collect(), result.collect())
 
     def test_vectorized_udf_complex(self):
-        from pyspark.sql.functions import pandas_udf, col, expr
         df = self.spark.range(10).select(
             col('id').cast('int').alias('a'),
             col('id').cast('int').alias('b'),
@@ -293,7 +278,6 @@ def test_vectorized_udf_complex(self):
         self.assertEquals(expected.collect(), res.collect())
 
     def test_vectorized_udf_exception(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10)
         raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType())
         with QuietTest(self.sc):
@@ -301,8 +285,8 @@ def test_vectorized_udf_exception(self):
                 df.select(raise_exception(col('id'))).collect()
 
     def test_vectorized_udf_invalid_length(self):
-        from pyspark.sql.functions import pandas_udf, col
         import pandas as pd
+
         df = self.spark.range(10)
         raise_exception = pandas_udf(lambda _: pd.Series(1), LongType())
         with QuietTest(self.sc):
@@ -312,7 +296,6 @@ def test_vectorized_udf_invalid_length(self):
                 df.select(raise_exception(col('id'))).collect()
 
     def test_vectorized_udf_chained(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10)
         f = pandas_udf(lambda x: x + 1, LongType())
         g = pandas_udf(lambda x: x - 1, LongType())
@@ -320,7 +303,6 @@ def test_vectorized_udf_chained(self):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_wrong_return_type(self):
-        from pyspark.sql.functions import pandas_udf
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(
                     NotImplementedError,
@@ -328,7 +310,6 @@ def test_vectorized_udf_wrong_return_type(self):
                 pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
 
     def test_vectorized_udf_return_scalar(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10)
         f = pandas_udf(lambda x: 1.0, DoubleType())
         with QuietTest(self.sc):
@@ -336,7 +317,6 @@ def test_vectorized_udf_return_scalar(self):
                 df.select(f(col('id'))).collect()
 
     def test_vectorized_udf_decorator(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10)
 
         @pandas_udf(returnType=LongType())
@@ -346,21 +326,18 @@ def identity(x):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_empty_partition(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
         f = pandas_udf(lambda x: x, LongType())
         res = df.select(f(col('id')))
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_varargs(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
         f = pandas_udf(lambda *v: v[0], LongType())
         res = df.select(f(col('id')))
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_unsupported_types(self):
-        from pyspark.sql.functions import pandas_udf
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(
                     NotImplementedError,
@@ -368,8 +345,6 @@ def test_vectorized_udf_unsupported_types(self):
                 pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
 
     def test_vectorized_udf_dates(self):
-        from pyspark.sql.functions import pandas_udf, col
-        from datetime import date
         schema = StructType().add("idx", LongType()).add("date", DateType())
         data = [(0, date(1969, 1, 1),),
                 (1, date(2012, 2, 2),),
@@ -405,8 +380,6 @@ def check_data(idx, date, date_copy):
             self.assertIsNone(result[i][3])  # "check_data" col
 
     def test_vectorized_udf_timestamps(self):
-        from pyspark.sql.functions import pandas_udf, col
-        from datetime import datetime
         schema = StructType([
             StructField("idx", LongType(), True),
             StructField("timestamp", TimestampType(), True)])
@@ -447,8 +420,8 @@ def check_data(idx, timestamp, timestamp_copy):
             self.assertIsNone(result[i][3])  # "check_data" col
 
     def test_vectorized_udf_return_timestamp_tz(self):
-        from pyspark.sql.functions import pandas_udf, col
         import pandas as pd
+
         df = self.spark.range(10)
 
         @pandas_udf(returnType=TimestampType())
@@ -465,8 +438,8 @@ def gen_timestamps(id):
             self.assertEquals(expected, ts)
 
     def test_vectorized_udf_check_config(self):
-        from pyspark.sql.functions import pandas_udf, col
         import pandas as pd
+
         with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 
3}):
             df = self.spark.range(10, numPartitions=1)
 
@@ -479,9 +452,8 @@ def check_records_per_batch(x):
                 self.assertTrue(r <= 3)
 
     def test_vectorized_udf_timestamps_respect_session_timezone(self):
-        from pyspark.sql.functions import pandas_udf, col
-        from datetime import datetime
         import pandas as pd
+
         schema = StructType([
             StructField("idx", LongType(), True),
             StructField("timestamp", TimestampType(), True)])
@@ -519,8 +491,6 @@ def 
test_vectorized_udf_timestamps_respect_session_timezone(self):
 
     def test_nondeterministic_vectorized_udf(self):
         # Test that nondeterministic UDFs are evaluated only once in chained 
UDF evaluations
-        from pyspark.sql.functions import pandas_udf, col
-
         @pandas_udf('double')
         def plus_ten(v):
             return v + 10
@@ -533,8 +503,6 @@ def plus_ten(v):
         self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10))
 
     def test_nondeterministic_vectorized_udf_in_aggregate(self):
-        from pyspark.sql.functions import sum
-
         df = self.spark.range(10)
         random_udf = self.nondeterministic_vectorized_udf
 
@@ -545,8 +513,6 @@ def test_nondeterministic_vectorized_udf_in_aggregate(self):
                 df.agg(sum(random_udf(df.id))).collect()
 
     def test_register_vectorized_udf_basic(self):
-        from pyspark.rdd import PythonEvalType
-        from pyspark.sql.functions import pandas_udf, col, expr
         df = self.spark.range(10).select(
             col('id').cast('int').alias('a'),
             col('id').cast('int').alias('b'))
@@ -563,11 +529,10 @@ def test_register_vectorized_udf_basic(self):
 
     # Regression test for SPARK-23314
     def test_timestamp_dst(self):
-        from pyspark.sql.functions import pandas_udf
         # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 
am
-        dt = [datetime.datetime(2015, 11, 1, 0, 30),
-              datetime.datetime(2015, 11, 1, 1, 30),
-              datetime.datetime(2015, 11, 1, 2, 30)]
+        dt = [datetime(2015, 11, 1, 0, 30),
+              datetime(2015, 11, 1, 1, 30),
+              datetime(2015, 11, 1, 2, 30)]
         df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
         foo_udf = pandas_udf(lambda x: x, 'timestamp')
         result = df.withColumn('time', foo_udf(df.time))
@@ -593,7 +558,6 @@ def test_type_annotation(self):
 
     def test_mixed_udf(self):
         import pandas as pd
-        from pyspark.sql.functions import col, udf, pandas_udf
 
         df = self.spark.range(0, 1).toDF('v')
 
@@ -696,8 +660,6 @@ def f4(x):
 
     def test_mixed_udf_and_sql(self):
         import pandas as pd
-        from pyspark.sql import Column
-        from pyspark.sql.functions import udf, pandas_udf
 
         df = self.spark.range(0, 1).toDF('v')
 
@@ -758,7 +720,6 @@ def test_datasource_with_udf(self):
         # This needs to a separate test because Arrow dependency is optional
         import pandas as pd
         import numpy as np
-        from pyspark.sql.functions import pandas_udf, lit, col
 
         path = tempfile.mkdtemp()
         shutil.rmtree(path)
diff --git a/python/pyspark/sql/tests/test_pandas_udf_window.py 
b/python/pyspark/sql/tests/test_pandas_udf_window.py
index f0e6d2696df62..0a7a19c1c0814 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_window.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_window.py
@@ -18,6 +18,8 @@
 import unittest
 
 from pyspark.sql.utils import AnalysisException
+from pyspark.sql.functions import array, explode, col, lit, mean, min, max, 
rank, \
+    udf, pandas_udf, PandasUDFType
 from pyspark.sql.window import Window
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, 
have_pyarrow, \
     pandas_requirement_message, pyarrow_requirement_message
@@ -30,7 +32,6 @@
 class WindowPandasUDFTests(ReusedSQLTestCase):
     @property
     def data(self):
-        from pyspark.sql.functions import array, explode, col, lit
         return self.spark.range(10).toDF('id') \
             .withColumn("vs", array([lit(i * 1.0) + col('id') for i in 
range(20, 30)])) \
             .withColumn("v", explode(col('vs'))) \
@@ -39,18 +40,14 @@ def data(self):
 
     @property
     def python_plus_one(self):
-        from pyspark.sql.functions import udf
         return udf(lambda v: v + 1, 'double')
 
     @property
     def pandas_scalar_time_two(self):
-        from pyspark.sql.functions import pandas_udf
         return pandas_udf(lambda v: v * 2, 'double')
 
     @property
     def pandas_agg_mean_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def avg(v):
             return v.mean()
@@ -58,8 +55,6 @@ def avg(v):
 
     @property
     def pandas_agg_max_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def max(v):
             return v.max()
@@ -67,8 +62,6 @@ def max(v):
 
     @property
     def pandas_agg_min_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def min(v):
             return v.min()
@@ -88,8 +81,6 @@ def unpartitioned_window(self):
         return Window.partitionBy()
 
     def test_simple(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         w = self.unbounded_window
 
@@ -105,8 +96,6 @@ def test_simple(self):
         self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
 
     def test_multiple_udfs(self):
-        from pyspark.sql.functions import max, min, mean
-
         df = self.data
         w = self.unbounded_window
 
@@ -121,8 +110,6 @@ def test_multiple_udfs(self):
         self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
     def test_replace_existing(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         w = self.unbounded_window
 
@@ -132,8 +119,6 @@ def test_replace_existing(self):
         self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
     def test_mixed_sql(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         w = self.unbounded_window
         mean_udf = self.pandas_agg_mean_udf
@@ -144,8 +129,6 @@ def test_mixed_sql(self):
         self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
     def test_mixed_udf(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         w = self.unbounded_window
 
@@ -171,8 +154,6 @@ def test_mixed_udf(self):
         self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
 
     def test_without_partitionBy(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         w = self.unpartitioned_window
         mean_udf = self.pandas_agg_mean_udf
@@ -187,8 +168,6 @@ def test_without_partitionBy(self):
         self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
 
     def test_mixed_sql_and_udf(self):
-        from pyspark.sql.functions import max, min, rank, col
-
         df = self.data
         w = self.unbounded_window
         ow = self.ordered_window
@@ -221,8 +200,6 @@ def test_mixed_sql_and_udf(self):
         self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
 
     def test_array_type(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         df = self.data
         w = self.unbounded_window
 
@@ -231,8 +208,6 @@ def test_array_type(self):
         self.assertEquals(result1.first()['v2'], [1.0, 2.0])
 
     def test_invalid_args(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         df = self.data
         w = self.unbounded_window
         ow = self.ordered_window


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Clean up import statements in pandas udf tests
> ----------------------------------------------
>
>                 Key: SPARK-26364
>                 URL: https://issues.apache.org/jira/browse/SPARK-26364
>             Project: Spark
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 2.4.0
>            Reporter: Li Jin
>            Assignee: Li Jin
>            Priority: Minor
>             Fix For: 3.0.0
>
>
> Per discussion [https://github.com/apache/spark/pull/22305/files#r241215618] 
> we should clean up the import statements in test_pandas_udf* and move them to 
> the top. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to