harishkrao commented on code in PR #28950: URL: https://github.com/apache/airflow/pull/28950#discussion_r1083704974
########## airflow/providers/databricks/sensors/databricks.py: ########## @@ -0,0 +1,231 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +"""This module contains Databricks sensors.""" + +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import Any, Callable, Sequence + +from airflow.exceptions import AirflowException +from airflow.providers.common.sql.hooks.sql import fetch_all_handler +from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook +from airflow.sensors.base import BaseSensorOperator +from airflow.utils.context import Context + + +class DatabricksSqlSensor(BaseSensorOperator): + """ + Generic Databricks SQL sensor. + + :param databricks_conn_id:str=DatabricksSqlHook.default_conn_name: Specify the name of the connection + to use with Databricks on Airflow + :param http_path:str: Specify the path to the sql endpoint + :param sql_endpoint_name:str: Specify the name of the sql endpoint to use + :param session_configuration: Pass in the session configuration to be used + :param http_headers:list[tuple[str, str]]: Pass http headers to the databricks API + :param catalog:str|None=None: Specify the catalog to use for the query + :param schema:str|None="default": Specify the schema of the table to be queried + :param table_name:str: Specify the table that we want to monitor + :param partition_name:dict|: Pass in the partition name to be used for the sensor + :param handler:Callable[[Any, Any]=fetch_all_handler: Define the handler function that will be used + to process the results of a query + :param db_sensor_type:str: Choose the sensor you want to use. Available options: table_partition, + table_changes. + :param timestamp:datetime: To be used with query filters or as other argument values for timestamp + :param caller: Identify the source of this sensor in logs + :param client_parameters:dict[str, Any]: Additional client parameters + """ + + def __init__( + self, + *, + databricks_conn_id: str = DatabricksSqlHook.default_conn_name, + http_path: str | None = None, + sql_endpoint_name: str | None = None, + session_configuration=None, + http_headers: list[tuple[str, str]] | None = None, + catalog: str = "", + schema: str | None = "default", + table_name: str, + partition_name: dict = {"date": "2023-1-1"}, + handler: Callable[[Any], Any] = fetch_all_handler, + db_sensor_type: str, + timestamp: datetime = datetime.now() - timedelta(days=7), Review Comment: The reason why I thought it needs to have some default value is to extract history for a time period irrespective of a custom value provided by the user, for example - past 7 days. ########## airflow/providers/databricks/sensors/databricks.py: ########## @@ -0,0 +1,231 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +"""This module contains Databricks sensors.""" + +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import Any, Callable, Sequence + +from airflow.exceptions import AirflowException +from airflow.providers.common.sql.hooks.sql import fetch_all_handler +from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook +from airflow.sensors.base import BaseSensorOperator +from airflow.utils.context import Context + + +class DatabricksSqlSensor(BaseSensorOperator): + """ + Generic Databricks SQL sensor. + + :param databricks_conn_id:str=DatabricksSqlHook.default_conn_name: Specify the name of the connection + to use with Databricks on Airflow + :param http_path:str: Specify the path to the sql endpoint + :param sql_endpoint_name:str: Specify the name of the sql endpoint to use + :param session_configuration: Pass in the session configuration to be used + :param http_headers:list[tuple[str, str]]: Pass http headers to the databricks API + :param catalog:str|None=None: Specify the catalog to use for the query + :param schema:str|None="default": Specify the schema of the table to be queried + :param table_name:str: Specify the table that we want to monitor + :param partition_name:dict|: Pass in the partition name to be used for the sensor + :param handler:Callable[[Any, Any]=fetch_all_handler: Define the handler function that will be used + to process the results of a query + :param db_sensor_type:str: Choose the sensor you want to use. Available options: table_partition, + table_changes. + :param timestamp:datetime: To be used with query filters or as other argument values for timestamp + :param caller: Identify the source of this sensor in logs + :param client_parameters:dict[str, Any]: Additional client parameters + """ + + def __init__( + self, + *, + databricks_conn_id: str = DatabricksSqlHook.default_conn_name, + http_path: str | None = None, + sql_endpoint_name: str | None = None, + session_configuration=None, + http_headers: list[tuple[str, str]] | None = None, + catalog: str = "", + schema: str | None = "default", + table_name: str, + partition_name: dict = {"date": "2023-1-1"}, + handler: Callable[[Any], Any] = fetch_all_handler, + db_sensor_type: str, + timestamp: datetime = datetime.now() - timedelta(days=7), Review Comment: Removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
