This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 97d8634  [SPARK-33021][PYTHON][TESTS] Move functions related test 
cases into test_functions.py
97d8634 is described below

commit 97d8634450b39c1f4e5308b8a5308650e1e7489a
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Mon Sep 28 21:54:00 2020 -0700

    [SPARK-33021][PYTHON][TESTS] Move functions related test cases into 
test_functions.py
    
    Move functions related test cases from `test_context.py` to 
`test_functions.py`.
    
    To group the similar test cases.
    
    Nope, test-only.
    
    Jenkins and GitHub Actions should test.
    
    Closes #29898 from HyukjinKwon/SPARK-33021.
    
    Authored-by: HyukjinKwon <gurwls...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 python/pyspark/sql/tests/test_context.py   | 101 ----------------------------
 python/pyspark/sql/tests/test_functions.py | 102 ++++++++++++++++++++++++++++-
 2 files changed, 101 insertions(+), 102 deletions(-)

diff --git a/python/pyspark/sql/tests/test_context.py 
b/python/pyspark/sql/tests/test_context.py
index 92e5434..3a0c7bb 100644
--- a/python/pyspark/sql/tests/test_context.py
+++ b/python/pyspark/sql/tests/test_context.py
@@ -30,7 +30,6 @@ import py4j
 from pyspark import SparkContext, SQLContext
 from pyspark.sql import Row, SparkSession
 from pyspark.sql.types import *
-from pyspark.sql.window import Window
 from pyspark.testing.utils import ReusedPySparkTestCase
 
 
@@ -112,99 +111,6 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
 
         shutil.rmtree(tmpPath)
 
-    def test_window_functions(self):
-        df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, 
"2")], ["key", "value"])
-        w = Window.partitionBy("value").orderBy("key")
-        from pyspark.sql import functions as F
-        sel = df.select(df.value, df.key,
-                        F.max("key").over(w.rowsBetween(0, 1)),
-                        F.min("key").over(w.rowsBetween(0, 1)),
-                        F.count("key").over(w.rowsBetween(float('-inf'), 
float('inf'))),
-                        F.row_number().over(w),
-                        F.rank().over(w),
-                        F.dense_rank().over(w),
-                        F.ntile(2).over(w))
-        rs = sorted(sel.collect())
-        expected = [
-            ("1", 1, 1, 1, 1, 1, 1, 1, 1),
-            ("2", 1, 1, 1, 3, 1, 1, 1, 1),
-            ("2", 1, 2, 1, 3, 2, 1, 1, 1),
-            ("2", 2, 2, 2, 3, 3, 3, 2, 2)
-        ]
-        for r, ex in zip(rs, expected):
-            self.assertEqual(tuple(r), ex[:len(r)])
-
-    def test_window_functions_without_partitionBy(self):
-        df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, 
"2")], ["key", "value"])
-        w = Window.orderBy("key", df.value)
-        from pyspark.sql import functions as F
-        sel = df.select(df.value, df.key,
-                        F.max("key").over(w.rowsBetween(0, 1)),
-                        F.min("key").over(w.rowsBetween(0, 1)),
-                        F.count("key").over(w.rowsBetween(float('-inf'), 
float('inf'))),
-                        F.row_number().over(w),
-                        F.rank().over(w),
-                        F.dense_rank().over(w),
-                        F.ntile(2).over(w))
-        rs = sorted(sel.collect())
-        expected = [
-            ("1", 1, 1, 1, 4, 1, 1, 1, 1),
-            ("2", 1, 1, 1, 4, 2, 2, 2, 1),
-            ("2", 1, 2, 1, 4, 3, 2, 2, 2),
-            ("2", 2, 2, 2, 4, 4, 4, 3, 2)
-        ]
-        for r, ex in zip(rs, expected):
-            self.assertEqual(tuple(r), ex[:len(r)])
-
-    def test_window_functions_cumulative_sum(self):
-        df = self.spark.createDataFrame([("one", 1), ("two", 2)], ["key", 
"value"])
-        from pyspark.sql import functions as F
-
-        # Test cumulative sum
-        sel = df.select(
-            df.key,
-            F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding, 
0)))
-        rs = sorted(sel.collect())
-        expected = [("one", 1), ("two", 3)]
-        for r, ex in zip(rs, expected):
-            self.assertEqual(tuple(r), ex[:len(r)])
-
-        # Test boundary values less than JVM's Long.MinValue and make sure we 
don't overflow
-        sel = df.select(
-            df.key,
-            F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding 
- 1, 0)))
-        rs = sorted(sel.collect())
-        expected = [("one", 1), ("two", 3)]
-        for r, ex in zip(rs, expected):
-            self.assertEqual(tuple(r), ex[:len(r)])
-
-        # Test boundary values greater than JVM's Long.MaxValue and make sure 
we don't overflow
-        frame_end = Window.unboundedFollowing + 1
-        sel = df.select(
-            df.key,
-            F.sum(df.value).over(Window.rowsBetween(Window.currentRow, 
frame_end)))
-        rs = sorted(sel.collect())
-        expected = [("one", 3), ("two", 2)]
-        for r, ex in zip(rs, expected):
-            self.assertEqual(tuple(r), ex[:len(r)])
-
-    def test_collect_functions(self):
-        df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, 
"2")], ["key", "value"])
-        from pyspark.sql import functions
-
-        self.assertEqual(
-            
sorted(df.select(functions.collect_set(df.key).alias('r')).collect()[0].r),
-            [1, 2])
-        self.assertEqual(
-            
sorted(df.select(functions.collect_list(df.key).alias('r')).collect()[0].r),
-            [1, 1, 1, 2])
-        self.assertEqual(
-            
sorted(df.select(functions.collect_set(df.value).alias('r')).collect()[0].r),
-            ["1", "2"])
-        self.assertEqual(
-            
sorted(df.select(functions.collect_list(df.value).alias('r')).collect()[0].r),
-            ["1", "2", "2", "2"])
-
     def test_limit_and_take(self):
         df = self.spark.range(1, 1000, numPartitions=10)
 
@@ -223,13 +129,6 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
         # Regression test for SPARK-17514: limit(n).collect() should the 
perform same as take(n)
         assert_runs_only_one_job_stage_and_task("collect_limit", lambda: 
df.limit(1).collect())
 
-    def test_datetime_functions(self):
-        from pyspark.sql import functions
-        from datetime import date
-        df = self.spark.range(1).selectExpr("'2017-01-22' as dateCol")
-        parse_result = 
df.select(functions.to_date(functions.col("dateCol"))).first()
-        self.assertEquals(date(2017, 1, 22), 
parse_result['to_date(`dateCol`)'])
-
     def test_unbounded_frames(self):
         from pyspark.sql import functions as F
         from pyspark.sql import window
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index fa9ee57..fd2ad22 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -18,7 +18,7 @@
 import datetime
 import sys
 
-from pyspark.sql import Row
+from pyspark.sql import Row, Window
 from pyspark.sql.functions import udf, input_file_name
 from pyspark.testing.sqlutils import ReusedSQLTestCase
 
@@ -337,6 +337,106 @@ class FunctionsTests(ReusedSQLTestCase):
 
         self.assertListEqual(actual, expected)
 
+    def test_window_functions(self):
+        df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, 
"2")], ["key", "value"])
+        w = Window.partitionBy("value").orderBy("key")
+        from pyspark.sql import functions as F
+        sel = df.select(df.value, df.key,
+                        F.max("key").over(w.rowsBetween(0, 1)),
+                        F.min("key").over(w.rowsBetween(0, 1)),
+                        F.count("key").over(w.rowsBetween(float('-inf'), 
float('inf'))),
+                        F.row_number().over(w),
+                        F.rank().over(w),
+                        F.dense_rank().over(w),
+                        F.ntile(2).over(w))
+        rs = sorted(sel.collect())
+        expected = [
+            ("1", 1, 1, 1, 1, 1, 1, 1, 1),
+            ("2", 1, 1, 1, 3, 1, 1, 1, 1),
+            ("2", 1, 2, 1, 3, 2, 1, 1, 1),
+            ("2", 2, 2, 2, 3, 3, 3, 2, 2)
+        ]
+        for r, ex in zip(rs, expected):
+            self.assertEqual(tuple(r), ex[:len(r)])
+
+    def test_window_functions_without_partitionBy(self):
+        df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, 
"2")], ["key", "value"])
+        w = Window.orderBy("key", df.value)
+        from pyspark.sql import functions as F
+        sel = df.select(df.value, df.key,
+                        F.max("key").over(w.rowsBetween(0, 1)),
+                        F.min("key").over(w.rowsBetween(0, 1)),
+                        F.count("key").over(w.rowsBetween(float('-inf'), 
float('inf'))),
+                        F.row_number().over(w),
+                        F.rank().over(w),
+                        F.dense_rank().over(w),
+                        F.ntile(2).over(w))
+        rs = sorted(sel.collect())
+        expected = [
+            ("1", 1, 1, 1, 4, 1, 1, 1, 1),
+            ("2", 1, 1, 1, 4, 2, 2, 2, 1),
+            ("2", 1, 2, 1, 4, 3, 2, 2, 2),
+            ("2", 2, 2, 2, 4, 4, 4, 3, 2)
+        ]
+        for r, ex in zip(rs, expected):
+            self.assertEqual(tuple(r), ex[:len(r)])
+
+    def test_window_functions_cumulative_sum(self):
+        df = self.spark.createDataFrame([("one", 1), ("two", 2)], ["key", 
"value"])
+        from pyspark.sql import functions as F
+
+        # Test cumulative sum
+        sel = df.select(
+            df.key,
+            F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding, 
0)))
+        rs = sorted(sel.collect())
+        expected = [("one", 1), ("two", 3)]
+        for r, ex in zip(rs, expected):
+            self.assertEqual(tuple(r), ex[:len(r)])
+
+        # Test boundary values less than JVM's Long.MinValue and make sure we 
don't overflow
+        sel = df.select(
+            df.key,
+            F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding 
- 1, 0)))
+        rs = sorted(sel.collect())
+        expected = [("one", 1), ("two", 3)]
+        for r, ex in zip(rs, expected):
+            self.assertEqual(tuple(r), ex[:len(r)])
+
+        # Test boundary values greater than JVM's Long.MaxValue and make sure 
we don't overflow
+        frame_end = Window.unboundedFollowing + 1
+        sel = df.select(
+            df.key,
+            F.sum(df.value).over(Window.rowsBetween(Window.currentRow, 
frame_end)))
+        rs = sorted(sel.collect())
+        expected = [("one", 3), ("two", 2)]
+        for r, ex in zip(rs, expected):
+            self.assertEqual(tuple(r), ex[:len(r)])
+
+    def test_collect_functions(self):
+        df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, 
"2")], ["key", "value"])
+        from pyspark.sql import functions
+
+        self.assertEqual(
+            
sorted(df.select(functions.collect_set(df.key).alias('r')).collect()[0].r),
+            [1, 2])
+        self.assertEqual(
+            
sorted(df.select(functions.collect_list(df.key).alias('r')).collect()[0].r),
+            [1, 1, 1, 2])
+        self.assertEqual(
+            
sorted(df.select(functions.collect_set(df.value).alias('r')).collect()[0].r),
+            ["1", "2"])
+        self.assertEqual(
+            
sorted(df.select(functions.collect_list(df.value).alias('r')).collect()[0].r),
+            ["1", "2", "2", "2"])
+
+    def test_datetime_functions(self):
+        from pyspark.sql import functions
+        from datetime import date
+        df = self.spark.range(1).selectExpr("'2017-01-22' as dateCol")
+        parse_result = 
df.select(functions.to_date(functions.col("dateCol"))).first()
+        self.assertEquals(date(2017, 1, 22), 
parse_result['to_date(`dateCol`)'])
+
 
 if __name__ == "__main__":
     import unittest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to