Re: [PR] [SPARK-48062][PYTHON][SS][TESTS] Add pyspark test for SimpleDataSourceStreamingReader [spark]

2024-05-21 Thread via GitHub


chaoqin-li1123 commented on code in PR #46306:
URL: https://github.com/apache/spark/pull/46306#discussion_r1609292918


##
python/pyspark/sql/tests/test_python_streaming_datasource.py:
##
@@ -150,50 +151,90 @@ def check_batch(df, batch_id):
 q.awaitTermination

Review Comment:
   Thanks for reminding!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48062][PYTHON][SS][TESTS] Add pyspark test for SimpleDataSourceStreamingReader [spark]

2024-05-21 Thread via GitHub


HyukjinKwon commented on code in PR #46306:
URL: https://github.com/apache/spark/pull/46306#discussion_r1609289839


##
python/pyspark/sql/tests/test_python_streaming_datasource.py:
##
@@ -150,50 +151,90 @@ def check_batch(df, batch_id):
 q.awaitTermination

Review Comment:
   `q.awaitTermination()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48062][PYTHON][SS][TESTS] Add pyspark test for SimpleDataSourceStreamingReader [spark]

2024-05-21 Thread via GitHub


chaoqin-li1123 commented on code in PR #46306:
URL: https://github.com/apache/spark/pull/46306#discussion_r1609283800


##
python/pyspark/sql/tests/test_python_streaming_datasource.py:
##
@@ -150,50 +151,90 @@ def check_batch(df, batch_id):
 q.awaitTermination

Review Comment:
   Could you elaborate? is this test flaky?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48062][PYTHON][SS][TESTS] Add pyspark test for SimpleDataSourceStreamingReader [spark]

2024-05-21 Thread via GitHub


HyukjinKwon commented on code in PR #46306:
URL: https://github.com/apache/spark/pull/46306#discussion_r1609264628


##
python/pyspark/sql/tests/test_python_streaming_datasource.py:
##
@@ -150,50 +151,90 @@ def check_batch(df, batch_id):
 q.awaitTermination

Review Comment:
   @chaoqin-li1123 can we fix all those instances?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48062][PYTHON][SS][TESTS] Add pyspark test for SimpleDataSourceStreamingReader [spark]

2024-05-01 Thread via GitHub


HyukjinKwon closed pull request #46306: [SPARK-48062][PYTHON][SS][TESTS] Add 
pyspark test for SimpleDataSourceStreamingReader
URL: https://github.com/apache/spark/pull/46306


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48062][PYTHON][SS][TESTS] Add pyspark test for SimpleDataSourceStreamingReader [spark]

2024-05-01 Thread via GitHub


HyukjinKwon commented on PR #46306:
URL: https://github.com/apache/spark/pull/46306#issuecomment-2089292078

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48062][PYTHON][SS][TESTS] Add pyspark test for SimpleDataSourceStreamingReader [spark]

2024-04-30 Thread via GitHub


chaoqin-li1123 commented on code in PR #46306:
URL: https://github.com/apache/spark/pull/46306#discussion_r1585867083


##
python/pyspark/sql/tests/test_python_streaming_datasource.py:
##
@@ -136,6 +137,33 @@ def streamWriter(self, schema, overwrite):
 
 return TestDataSource
 
+def _get_simple_data_source(self):

Review Comment:
   Sounds good.



##
python/pyspark/sql/tests/test_python_streaming_datasource.py:
##
@@ -150,6 +178,20 @@ def check_batch(df, batch_id):
 q.awaitTermination
 self.assertIsNone(q.exception(), "No exception has to be propagated.")
 
+def test_simple_stream_reader(self):
+self.spark.dataSource.register(self._get_simple_data_source())
+df = self.spark.readStream.format("SimpleDataSource").load()
+
+def check_batch(df, batch_id):
+assertDataFrameEqual(df, [Row(batch_id * 2), Row(batch_id * 2 + 
1)])
+
+q = df.writeStream.foreachBatch(check_batch).start()
+while len(q.recentProgress) < 10:
+time.sleep(0.2)
+q.stop()
+q.awaitTermination

Review Comment:
   Nice catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48062][PYTHON][SS][TESTS] Add pyspark test for SimpleDataSourceStreamingReader [spark]

2024-04-30 Thread via GitHub


chaoqin-li1123 commented on code in PR #46306:
URL: https://github.com/apache/spark/pull/46306#discussion_r1585866969


##
python/pyspark/sql/tests/test_python_streaming_datasource.py:
##
@@ -150,6 +178,20 @@ def check_batch(df, batch_id):
 q.awaitTermination
 self.assertIsNone(q.exception(), "No exception has to be propagated.")
 
+def test_simple_stream_reader(self):
+self.spark.dataSource.register(self._get_simple_data_source())
+df = self.spark.readStream.format("SimpleDataSource").load()
+
+def check_batch(df, batch_id):
+assertDataFrameEqual(df, [Row(batch_id * 2), Row(batch_id * 2 + 
1)])
+
+q = df.writeStream.foreachBatch(check_batch).start()
+while len(q.recentProgress) < 10:
+time.sleep(0.2)
+q.stop()
+q.awaitTermination
+self.assertIsNone(q.exception(), "No exception has to be propagated.")
+
 def test_stream_writer(self):
 input_dir = 
tempfile.TemporaryDirectory(prefix="test_data_stream_write_input")
 output_dir = 
tempfile.TemporaryDirectory(prefix="test_data_stream_write_output")

Review Comment:
   Fixed.



##
python/pyspark/sql/tests/test_python_streaming_datasource.py:
##
@@ -150,6 +178,20 @@ def check_batch(df, batch_id):
 q.awaitTermination
 self.assertIsNone(q.exception(), "No exception has to be propagated.")
 
+def test_simple_stream_reader(self):
+self.spark.dataSource.register(self._get_simple_data_source())
+df = self.spark.readStream.format("SimpleDataSource").load()
+
+def check_batch(df, batch_id):
+assertDataFrameEqual(df, [Row(batch_id * 2), Row(batch_id * 2 + 
1)])
+
+q = df.writeStream.foreachBatch(check_batch).start()
+while len(q.recentProgress) < 10:
+time.sleep(0.2)
+q.stop()
+q.awaitTermination
+self.assertIsNone(q.exception(), "No exception has to be propagated.")
+
 def test_stream_writer(self):
 input_dir = 
tempfile.TemporaryDirectory(prefix="test_data_stream_write_input")

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org