MaksYermak commented on code in PR #61148:
URL: https://github.com/apache/airflow/pull/61148#discussion_r2852449901
##########
providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py:
##########
@@ -77,17 +76,7 @@ def __init__(
**kwargs,
) -> None:
if deferrable and "poke_interval" not in kwargs:
- # TODO: Remove once deprecated
- if "polling_interval" in kwargs:
- kwargs["poke_interval"] = kwargs["polling_interval"]
- warnings.warn(
- "Argument `poll_interval` is deprecated and will be
removed "
- "in a future release. Please use `poke_interval`
instead.",
- AirflowProviderDeprecationWarning,
- stacklevel=2,
- )
- else:
- kwargs["poke_interval"] = 5
Review Comment:
@radhwene could you please return this deprecated code back for the current
operator? In google provider we have deprecation policy
https://airflow.apache.org/docs/apache-airflow-providers-google/stable/deprecation-policy.html.
We can not remove deprecated code without specifying planned removal date.
##########
providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py:
##########
@@ -152,22 +162,105 @@
)
# [END howto_sensor_bigquery_table_partition_async]
+ # [START howto_sensor_bigquery_streaming_buffer_empty]
+ check_streaming_buffer_empty = BigQueryStreamingBufferEmptySensor(
+ task_id="check_streaming_buffer_empty",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_NAME,
+ poke_interval=30,
+ timeout=5400, # 90 minutes - Google Cloud flushes streaming buffer
within 90 minutes
+ )
+ # [END howto_sensor_bigquery_streaming_buffer_empty]
+
+ # Streaming operations: INSERT, UPDATE, DELETE
+ # These operations write data to the streaming buffer before being flushed
to persistent storage
+ stream_insert = BigQueryInsertJobOperator(
+ task_id="stream_insert",
+ configuration={
+ "query": {
+ "query": STREAMING_INSERT_QUERY,
+ "useLegacySql": False,
+ }
+ },
+ )
+
+ stream_update = BigQueryInsertJobOperator(
+ task_id="stream_update",
+ configuration={
+ "query": {
+ "query": STREAMING_UPDATE_QUERY,
+ "useLegacySql": False,
+ }
+ },
+ )
+
+ stream_delete = BigQueryInsertJobOperator(
+ task_id="stream_delete",
+ configuration={
+ "query": {
+ "query": STREAMING_DELETE_QUERY,
+ "useLegacySql": False,
+ }
+ },
+ )
+
+ # [START howto_sensor_bigquery_streaming_buffer_empty_defered]
+ check_streaming_buffer_empty_def = BigQueryStreamingBufferEmptySensor(
+ task_id="check_streaming_buffer_empty_def",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_NAME,
+ deferrable=True,
+ poke_interval=30,
+ timeout=5400, # 90 minutes - Google Cloud flushes streaming buffer
within 90 minutes
+ )
+ # [END howto_sensor_bigquery_streaming_buffer_empty_defered]
+
+ # [START howto_sensor_bigquery_streaming_buffer_empty_async]
+ check_streaming_buffer_empty_async = BigQueryStreamingBufferEmptySensor(
+ task_id="check_streaming_buffer_empty_async",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_NAME,
+ poke_interval=30,
+ timeout=5400, # 90 minutes - Google Cloud flushes streaming buffer
within 90 minutes
+ )
+ # [END howto_sensor_bigquery_streaming_buffer_empty_async]
Review Comment:
@radhwene thank you for the explanation. If it is duplicate from the sync
task then could you please remove this task from the system test and update the
documentation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]