This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fc98ccdf8f7e [SPARK-48746][PYTHON][SS][TESTS] Avoid using global temp
view in foreachBatch test case
fc98ccdf8f7e is described below
commit fc98ccdf8f7e1287726fb40cf08e7ed2f3864ef6
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Jun 28 16:28:47 2024 +0900
[SPARK-48746][PYTHON][SS][TESTS] Avoid using global temp view in
foreachBatch test case
### What changes were proposed in this pull request?
For regular foreachBatch tests, it's better to avoid a global temp view in
their tests.
### Why are the changes needed?
Using temp views sometimes [confuse
users](https://stackoverflow.com/questions/62709024/temporary-view-in-spark-structure-streaming)
so in standard tests, should be better to avoid using them unless we
explicitly test a global temp view.
### Does this PR introduce _any_ user-facing change?
No, test-only.
### How was this patch tested?
Manually ran the tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47140 from HyukjinKwon/SPARK-48746.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/tests/streaming/test_streaming_foreach_batch.py | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
index ef286115a303..de8f30baebca 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
@@ -29,17 +29,18 @@ class StreamingTestsForeachBatchMixin:
q = None
def collectBatch(batch_df, batch_id):
- batch_df.createOrReplaceGlobalTempView("test_view")
+ batch_df.write.format("parquet").saveAsTable("test_table")
try:
df =
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
q = df.writeStream.foreachBatch(collectBatch).start()
q.processAllAvailable()
- collected = self.spark.sql("select * from
global_temp.test_view").collect()
+ collected = self.spark.sql("select * from test_table").collect()
self.assertTrue(len(collected), 2)
finally:
if q:
q.stop()
+ self.spark.sql("DROP TABLE IF EXISTS test_table")
def test_streaming_foreach_batch_tempview(self):
q = None
@@ -50,18 +51,19 @@ class StreamingTestsForeachBatchMixin:
# clone the session which is no longer same with the session used
to start the
# streaming query
assert len(batch_df.sparkSession.sql("SELECT * FROM
updates").collect()) == 2
- # Write to a global view verify on the repl/client side.
- batch_df.createOrReplaceGlobalTempView("temp_view")
+ # Write a table to verify on the repl/client side.
+ batch_df.write.format("parquet").saveAsTable("test_table")
try:
df =
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
q = df.writeStream.foreachBatch(collectBatch).start()
q.processAllAvailable()
- collected = self.spark.sql("SELECT * FROM
global_temp.temp_view").collect()
+ collected = self.spark.sql("SELECT * FROM test_table").collect()
self.assertTrue(len(collected[0]), 2)
finally:
if q:
q.stop()
+ self.spark.sql("DROP TABLE IF EXISTS test_table")
def test_streaming_foreach_batch_propagates_python_errors(self):
from pyspark.errors import StreamingQueryException
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]