Copilot commented on code in PR #59736:
URL: https://github.com/apache/airflow/pull/59736#discussion_r2644025229


##########
providers/google/tests/unit/google/cloud/sensors/test_bigquery.py:
##########
@@ -255,3 +256,75 @@ def context():
     """
     context = {}
     return context
+
+
+class TestBigQueryTableStreamingBufferEmptySensor:
+    @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
+    def test_poke_when_no_streaming_buffer(self, mock_hook):
+        """Test sensor returns True when table has no streaming buffer."""
+        task = BigQueryTableStreamingBufferEmptySensor(
+            task_id="task-id",
+            project_id=TEST_PROJECT_ID,
+            dataset_id=TEST_DATASET_ID,
+            table_id=TEST_TABLE_ID,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+
+        # Mock table with no streaming buffer
+        mock_table = mock.MagicMock()
+        mock_table.streaming_buffer = None
+        mock_hook.return_value.get_client.return_value.get_table.return_value 
= mock_table
+
+        result = task.poke(mock.MagicMock())
+
+        assert result is True
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+
+    @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
+    def test_poke_when_streaming_buffer_exists(self, mock_hook):
+        """Test sensor returns False when table has active streaming buffer."""
+        task = BigQueryTableStreamingBufferEmptySensor(
+            task_id="task-id",
+            project_id=TEST_PROJECT_ID,
+            dataset_id=TEST_DATASET_ID,
+            table_id=TEST_TABLE_ID,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+        )
+
+        # Mock table with streaming buffer
+        mock_table = mock.MagicMock()
+        mock_streaming_buffer = mock.MagicMock()
+        mock_streaming_buffer.estimated_rows = 1000
+        mock_table.streaming_buffer = mock_streaming_buffer
+        mock_hook.return_value.get_client.return_value.get_table.return_value 
= mock_table
+
+        result = task.poke(mock.MagicMock())
+
+        assert result is False
+
+    @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
+    def test_sensor_with_correct_table_reference(self, mock_hook):
+        """Test sensor constructs correct table reference."""
+        task = BigQueryTableStreamingBufferEmptySensor(
+            task_id="task-id",
+            project_id=TEST_PROJECT_ID,
+            dataset_id=TEST_DATASET_ID,
+            table_id=TEST_TABLE_ID,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+        )
+
+        mock_table = mock.MagicMock()
+        mock_table.streaming_buffer = None
+        mock_hook.return_value.get_client.return_value.get_table.return_value 
= mock_table
+
+        task.poke(mock.MagicMock())
+
+        # Verify correct table reference format
+        
mock_hook.return_value.get_client.return_value.get_table.assert_called_once_with(
+            f"{TEST_PROJECT_ID}.{TEST_DATASET_ID}.{TEST_TABLE_ID}"
+        )
+

Review Comment:
   Test coverage is missing for the case where 
`streaming_buffer.estimated_rows` is 0. According to the implementation, if a 
streaming buffer exists but has `estimated_rows = 0`, the sensor returns False. 
It would be helpful to verify this behavior with a test to clarify whether an 
empty streaming buffer (with 0 rows) still needs to be flushed before DML 
operations can proceed.



##########
example_bigquery_streaming_buffer.py:
##########
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example DAG demonstrating BigQueryTableStreamingBufferEmptySensor usage.
+
+This example shows how to safely perform DML operations on BigQuery tables
+that receive streaming inserts.
+"""
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.google.cloud.operators.bigquery import 
BigQueryInsertJobOperator
+from airflow.providers.google.cloud.sensors.bigquery import 
BigQueryTableStreamingBufferEmptySensor
+
+PROJECT_ID = "your-project-id"
+DATASET_ID = "your_dataset"
+TABLE_ID = "your_streaming_table"
+
+with DAG(
+    dag_id="example_bigquery_streaming_buffer_sensor",
+    start_date=datetime(2024, 1, 1),
+    schedule=None,
+    catchup=False,
+    tags=["example", "bigquery", "streaming"],
+) as dag:
+    # Wait for streaming buffer to be empty before running DML
+    wait_for_buffer_empty = BigQueryTableStreamingBufferEmptySensor(
+        task_id="wait_for_streaming_buffer_empty",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_ID,
+        table_id=TABLE_ID,
+        poke_interval=60,  # Check every 60 seconds
+        timeout=3600,  # Timeout after 1 hour
+        mode="reschedule",  # Free up worker slots while waiting
+    )
+
+    # Run DML operation once buffer is empty
+    run_dml = BigQueryInsertJobOperator(
+        task_id="run_dml_operation",
+        configuration={
+            "query": {
+                "query": f"""
+                    UPDATE `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
+                    SET status = 'processed'
+                    WHERE status = 'pending'
+                """,
+                "useLegacySql": False,
+            }
+        },
+    )

Review Comment:
   The BigQueryInsertJobOperator is missing the `project_id` parameter. While 
the project ID is embedded in the query string, it's better to explicitly pass 
it as a parameter for consistency with the sensor configuration and to make the 
code more maintainable.



##########
providers/google/tests/unit/google/cloud/sensors/test_bigquery.py:
##########
@@ -255,3 +256,75 @@ def context():
     """
     context = {}
     return context
+
+
+class TestBigQueryTableStreamingBufferEmptySensor:
+    @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
+    def test_poke_when_no_streaming_buffer(self, mock_hook):
+        """Test sensor returns True when table has no streaming buffer."""
+        task = BigQueryTableStreamingBufferEmptySensor(
+            task_id="task-id",
+            project_id=TEST_PROJECT_ID,
+            dataset_id=TEST_DATASET_ID,
+            table_id=TEST_TABLE_ID,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+
+        # Mock table with no streaming buffer
+        mock_table = mock.MagicMock()
+        mock_table.streaming_buffer = None
+        mock_hook.return_value.get_client.return_value.get_table.return_value 
= mock_table
+
+        result = task.poke(mock.MagicMock())
+
+        assert result is True
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+
+    @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
+    def test_poke_when_streaming_buffer_exists(self, mock_hook):
+        """Test sensor returns False when table has active streaming buffer."""
+        task = BigQueryTableStreamingBufferEmptySensor(
+            task_id="task-id",
+            project_id=TEST_PROJECT_ID,
+            dataset_id=TEST_DATASET_ID,
+            table_id=TEST_TABLE_ID,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+        )
+
+        # Mock table with streaming buffer
+        mock_table = mock.MagicMock()
+        mock_streaming_buffer = mock.MagicMock()
+        mock_streaming_buffer.estimated_rows = 1000
+        mock_table.streaming_buffer = mock_streaming_buffer
+        mock_hook.return_value.get_client.return_value.get_table.return_value 
= mock_table
+
+        result = task.poke(mock.MagicMock())
+
+        assert result is False
+
+    @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
+    def test_sensor_with_correct_table_reference(self, mock_hook):
+        """Test sensor constructs correct table reference."""
+        task = BigQueryTableStreamingBufferEmptySensor(
+            task_id="task-id",
+            project_id=TEST_PROJECT_ID,
+            dataset_id=TEST_DATASET_ID,
+            table_id=TEST_TABLE_ID,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+        )
+
+        mock_table = mock.MagicMock()
+        mock_table.streaming_buffer = None
+        mock_hook.return_value.get_client.return_value.get_table.return_value 
= mock_table
+
+        task.poke(mock.MagicMock())
+
+        # Verify correct table reference format
+        
mock_hook.return_value.get_client.return_value.get_table.assert_called_once_with(
+            f"{TEST_PROJECT_ID}.{TEST_DATASET_ID}.{TEST_TABLE_ID}"
+        )
+

Review Comment:
   Test coverage is missing for error scenarios, specifically when the table 
doesn't exist. The poke method calls `hook.get_client().get_table()` which will 
raise an exception if the table is not found. A test should verify that this 
error is properly propagated to help users understand what happens when they 
configure the sensor with an invalid table reference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to