justinpakzad commented on code in PR #68411:
URL: https://github.com/apache/airflow/pull/68411#discussion_r3408712295
##########
providers/elasticsearch/tests/unit/elasticsearch/utils/test_sql.py:
##########
@@ -179,3 +177,152 @@ def test_read_sql_to_polars_no_cursor_cleanup():
read_sql_to_polars(es, "SELECT *")
es.sql.clear_cursor.assert_not_called()
+
+
+def test_read_sql_to_polars_by_chunks_single_chunk():
+ es = _mock_es(
+ [
+ {
+ "columns": COLUMNS,
+ "rows": [[1, "a"], [2, "b"]],
+ }
+ ]
+ )
+
+ chunks = list(
+ read_sql_to_polars_by_chunks(
+ es,
+ "SELECT *",
+ chunksize=10,
+ )
+ )
+
+ assert len(chunks) == 1
+
+ df = chunks[0]
+ assert df.shape == (2, 2)
+ assert df.to_dict(as_series=False) == {
+ "id": [1, 2],
+ "name": ["a", "b"],
+ }
+
+
+def test_read_sql_to_polars_by_chunks_single_page_multiple_chunks():
+ es = _mock_es(
+ [
+ {
+ "columns": COLUMNS,
+ "rows": [
+ [1, "a"],
+ [2, "b"],
+ [3, "c"],
+ ],
+ }
+ ]
+ )
+
+ chunks = list(
+ read_sql_to_polars_by_chunks(
+ es,
+ "SELECT *",
+ chunksize=2,
+ )
+ )
+
+ assert [chunk.shape for chunk in chunks] == [
+ (2, 2),
+ (1, 2),
+ ]
+
+ assert chunks[0].to_dict(as_series=False) == {
+ "id": [1, 2],
+ "name": ["a", "b"],
+ }
+
+ assert chunks[1].to_dict(as_series=False) == {
+ "id": [3],
+ "name": ["c"],
+ }
+
+
+def test_read_sql_to_polars_by_chunks_across_cursor_pages():
+ es = _mock_es(
+ [
+ {
+ "columns": COLUMNS,
+ "rows": [[1, "a"], [2, "b"]],
+ "cursor": "cursor_1",
+ },
+ {
+ "rows": [[3, "c"], [4, "d"]],
+ "cursor": None,
+ },
+ ]
+ )
+
+ chunks = list(
+ read_sql_to_polars_by_chunks(
+ es,
+ "SELECT *",
+ chunksize=3,
+ )
+ )
+
+ assert len(chunks) == 2
+
+ assert chunks[0].to_dict(as_series=False) == {
+ "id": [1, 2, 3],
+ "name": ["a", "b", "c"],
+ }
+
+ assert chunks[1].to_dict(as_series=False) == {
+ "id": [4],
+ "name": ["d"],
+ }
+
+
+def test_read_sql_to_polars_by_chunks_clears_cursor():
Review Comment:
This test and the one below both test the happy paths, last_cursor exists
cleanup, no last_cursor exists, cleanup is skipped. Is it worth adding a test
for the case where an error occurs mid-iteration but last_cursor exists, and
cleanup still happens?
##########
providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py:
##########
@@ -276,6 +276,32 @@ def test_get_df_polars(self, mock_reader):
max_rows=10,
)
+
@mock.patch("airflow.providers.elasticsearch.hooks.elasticsearch.read_sql_to_polars_by_chunks")
+ def test_get_df_polars_df_by_chunks(self, mock_reader):
+ mock_reader.return_value = "df"
Review Comment:
Is this needed here? It gets overridden a couple lines below with
`mock_reader.return_value = generator`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]