This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 84a65bd1cf9 [SPARK-42944][PYTHON][FOLLOW-UP] Rename tests from
foreachBatch to foreach_batch
84a65bd1cf9 is described below
commit 84a65bd1cf95775c4210c6dac8026551fd9d150f
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Sat Aug 26 21:51:29 2023 -0700
[SPARK-42944][PYTHON][FOLLOW-UP] Rename tests from foreachBatch to
foreach_batch
### What changes were proposed in this pull request?
This PR proposes to rename tests from foreachBatch to foreach_batch.
### Why are the changes needed?
Non-API should follow snake_naming rule per PEP 8.
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
CI in this PR should test it out.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42675 from HyukjinKwon/pyspark-connect.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../planner/StreamingForeachBatchHelper.scala | 2 +-
dev/sparktestsupport/modules.py | 4 ++--
...eachBatch_worker.py => foreach_batch_worker.py} | 0
...oreachBatch.py => test_parity_foreach_batch.py} | 12 +++++-----
...achBatch.py => test_streaming_foreach_batch.py} | 28 +++++++++++-----------
5 files changed, 23 insertions(+), 23 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
index ef7195439f9..c30e08bc39d 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
@@ -108,7 +108,7 @@ object StreamingForeachBatchHelper extends Logging {
pythonFn,
connectUrl,
sessionHolder.sessionId,
- "pyspark.sql.connect.streaming.worker.foreachBatch_worker")
+ "pyspark.sql.connect.streaming.worker.foreach_batch_worker")
val (dataOut, dataIn) = runner.init()
val foreachBatchRunnerFn: FnArgsWithId => Unit = (args: FnArgsWithId) => {
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 3c018ac7c83..741b89466be 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -497,7 +497,7 @@ pyspark_sql = Module(
"pyspark.sql.tests.test_session",
"pyspark.sql.tests.streaming.test_streaming",
"pyspark.sql.tests.streaming.test_streaming_foreach",
- "pyspark.sql.tests.streaming.test_streaming_foreachBatch",
+ "pyspark.sql.tests.streaming.test_streaming_foreach_batch",
"pyspark.sql.tests.streaming.test_streaming_listener",
"pyspark.sql.tests.test_types",
"pyspark.sql.tests.test_udf",
@@ -866,7 +866,7 @@ pyspark_connect = Module(
"pyspark.sql.tests.connect.streaming.test_parity_streaming",
"pyspark.sql.tests.connect.streaming.test_parity_listener",
"pyspark.sql.tests.connect.streaming.test_parity_foreach",
- "pyspark.sql.tests.connect.streaming.test_parity_foreachBatch",
+ "pyspark.sql.tests.connect.streaming.test_parity_foreach_batch",
"pyspark.sql.tests.connect.test_parity_pandas_grouped_map_with_state",
"pyspark.sql.tests.connect.test_parity_pandas_udf_scalar",
"pyspark.sql.tests.connect.test_parity_pandas_udf_grouped_agg",
diff --git a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py
similarity index 100%
rename from python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
rename to python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py
diff --git
a/python/pyspark/sql/tests/connect/streaming/test_parity_foreachBatch.py
b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
similarity index 87%
rename from
python/pyspark/sql/tests/connect/streaming/test_parity_foreachBatch.py
rename to
python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
index 0718c6a88b0..e4577173687 100644
--- a/python/pyspark/sql/tests/connect/streaming/test_parity_foreachBatch.py
+++ b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
@@ -17,19 +17,19 @@
import unittest
-from pyspark.sql.tests.streaming.test_streaming_foreachBatch import
StreamingTestsForeachBatchMixin
+from pyspark.sql.tests.streaming.test_streaming_foreach_batch import
StreamingTestsForeachBatchMixin
from pyspark.testing.connectutils import ReusedConnectTestCase
from pyspark.errors import PySparkPicklingError
class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin,
ReusedConnectTestCase):
@unittest.skip("SPARK-44463: Error handling needs improvement in connect
foreachBatch")
- def test_streaming_foreachBatch_propagates_python_errors(self):
- super().test_streaming_foreachBatch_propagates_python_errors
+ def test_streaming_foreach_batch_propagates_python_errors(self):
+ super().test_streaming_foreach_batch_propagates_python_errors()
@unittest.skip("This seems specific to py4j and pinned threads. The
intention is unclear")
- def test_streaming_foreachBatch_graceful_stop(self):
- super().test_streaming_foreachBatch_graceful_stop()
+ def test_streaming_foreach_batch_graceful_stop(self):
+ super().test_streaming_foreach_batch_graceful_stop()
# class StreamingForeachBatchParityTests(ReusedConnectTestCase):
def test_accessing_spark_session(self):
@@ -63,7 +63,7 @@ class
StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedCo
if __name__ == "__main__":
import unittest
- from pyspark.sql.tests.connect.streaming.test_parity_foreachBatch import *
# noqa: F401,E501
+ from pyspark.sql.tests.connect.streaming.test_parity_foreach_batch import
* # noqa: F401,E501
try:
import xmlrunner # type: ignore[import]
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_foreachBatch.py
b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
similarity index 90%
rename from python/pyspark/sql/tests/streaming/test_streaming_foreachBatch.py
rename to python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
index 65a0f6279fb..af2831ef193 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_foreachBatch.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
@@ -25,7 +25,7 @@ def my_test_function_1():
class StreamingTestsForeachBatchMixin:
- def test_streaming_foreachBatch(self):
+ def test_streaming_foreach_batch(self):
q = None
def collectBatch(batch_df, batch_id):
@@ -41,7 +41,7 @@ class StreamingTestsForeachBatchMixin:
if q:
q.stop()
- def test_streaming_foreachBatch_tempview(self):
+ def test_streaming_foreach_batch_tempview(self):
q = None
def collectBatch(batch_df, batch_id):
@@ -63,7 +63,7 @@ class StreamingTestsForeachBatchMixin:
if q:
q.stop()
- def test_streaming_foreachBatch_propagates_python_errors(self):
+ def test_streaming_foreach_batch_propagates_python_errors(self):
from pyspark.errors import StreamingQueryException
q = None
@@ -82,7 +82,7 @@ class StreamingTestsForeachBatchMixin:
if q:
q.stop()
- def test_streaming_foreachBatch_graceful_stop(self):
+ def test_streaming_foreach_batch_graceful_stop(self):
# SPARK-39218: Make foreachBatch streaming query stop gracefully
def func(batch_df, _):
batch_df.sparkSession._jvm.java.lang.Thread.sleep(10000)
@@ -92,8 +92,8 @@ class StreamingTestsForeachBatchMixin:
q.stop()
self.assertIsNone(q.exception(), "No exception has to be propagated.")
- def test_streaming_foreachBatch_spark_session(self):
- table_name = "testTable_foreachBatch"
+ def test_streaming_foreach_batch_spark_session(self):
+ table_name = "testTable_foreach_batch"
def func(df: DataFrame, batch_id: int):
if batch_id > 0: # only process once
@@ -115,8 +115,8 @@ class StreamingTestsForeachBatchMixin:
)
self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
- def test_streaming_foreachBatch_path_access(self):
- table_name = "testTable_foreachBatch_path"
+ def test_streaming_foreach_batch_path_access(self):
+ table_name = "testTable_foreach_batch_path"
def func(df: DataFrame, batch_id: int):
if batch_id > 0: # only process once
@@ -141,11 +141,11 @@ class StreamingTestsForeachBatchMixin:
def my_test_function_2():
return 2
- def test_streaming_foreachBatch_fuction_calling(self):
+ def test_streaming_foreach_batch_fuction_calling(self):
def my_test_function_3():
return 3
- table_name = "testTable_foreachBatch_function"
+ table_name = "testTable_foreach_batch_function"
def func(df: DataFrame, batch_id: int):
if batch_id > 0: # only process once
@@ -175,10 +175,10 @@ class StreamingTestsForeachBatchMixin:
)
self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
- def test_streaming_foreachBatch_import(self):
- import time # not imported in foreachBatch_worker
+ def test_streaming_foreach_batch_import(self):
+ import time # not imported in foreach_batch_worker
- table_name = "testTable_foreachBatch_import"
+ table_name = "testTable_foreach_batch_import"
def func(df: DataFrame, batch_id: int):
if batch_id > 0: # only process once
@@ -204,7 +204,7 @@ class
StreamingTestsForeachBatch(StreamingTestsForeachBatchMixin, ReusedSQLTestC
if __name__ == "__main__":
import unittest
- from pyspark.sql.tests.streaming.test_streaming_foreachBatch import * #
noqa: F401
+ from pyspark.sql.tests.streaming.test_streaming_foreach_batch import * #
noqa: F401
try:
import xmlrunner
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]