This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4e69857195a6 Revert "[SPARK-47777][PYTHON][SS][TESTS] Add spark connect test for python streaming data source" 4e69857195a6 is described below commit 4e69857195a6f95c22f962e3eed950876036c04f Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Mon May 6 09:38:09 2024 +0900 Revert "[SPARK-47777][PYTHON][SS][TESTS] Add spark connect test for python streaming data source" This reverts commit 3d2b7fea7fe0a835166ba2b98973300f6844a29b. --- dev/sparktestsupport/modules.py | 1 - .../test_parity_python_streaming_datasource.py | 39 ---------------------- .../sql/tests/test_python_streaming_datasource.py | 6 +++- 3 files changed, 5 insertions(+), 41 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 8a94187bfced..e73cdd7b80c3 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -1047,7 +1047,6 @@ pyspark_connect = Module( "pyspark.sql.tests.connect.test_parity_arrow_grouped_map", "pyspark.sql.tests.connect.test_parity_arrow_cogrouped_map", "pyspark.sql.tests.connect.test_parity_python_datasource", - "pyspark.sql.tests.connect.test_parity_python_streaming_datasource", "pyspark.sql.tests.connect.test_utils", "pyspark.sql.tests.connect.client.test_artifact", "pyspark.sql.tests.connect.client.test_client", diff --git a/python/pyspark/sql/tests/connect/test_parity_python_streaming_datasource.py b/python/pyspark/sql/tests/connect/test_parity_python_streaming_datasource.py deleted file mode 100644 index 65bb4c021f4d..000000000000 --- a/python/pyspark/sql/tests/connect/test_parity_python_streaming_datasource.py +++ /dev/null @@ -1,39 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from pyspark.sql.tests.test_python_streaming_datasource import ( - BasePythonStreamingDataSourceTestsMixin, -) -from pyspark.testing.connectutils import ReusedConnectTestCase - - -class PythonStreamingDataSourceParityTests( - BasePythonStreamingDataSourceTestsMixin, ReusedConnectTestCase -): - pass - - -if __name__ == "__main__": - import unittest - from pyspark.sql.tests.connect.test_parity_python_streaming_datasource import * # noqa: F401 - - try: - import xmlrunner # type: ignore[import] - - testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) - except ImportError: - testRunner = None - unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py b/python/pyspark/sql/tests/test_python_streaming_datasource.py index 5125e9ad6dec..90f06223e009 100644 --- a/python/pyspark/sql/tests/test_python_streaming_datasource.py +++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py @@ -141,11 +141,15 @@ class BasePythonStreamingDataSourceTestsMixin: self.spark.dataSource.register(self._get_test_data_source()) df = self.spark.readStream.format("TestDataSource").load() + current_batch_id = -1 + def check_batch(df, batch_id): + nonlocal current_batch_id + current_batch_id = batch_id assertDataFrameEqual(df, [Row(batch_id * 2), Row(batch_id * 2 + 1)]) q = df.writeStream.foreachBatch(check_batch).start() - while len(q.recentProgress) < 10: + while current_batch_id < 10: time.sleep(0.2) q.stop() q.awaitTermination --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org