turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477608830
##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
def execute(self, context: Dict):
self.log.info("Fetching data for job with id: %d ...", self.job_id)
- hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+ hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
response = hook.get_jobs_for_job_group(job_id=self.job_id)
return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+ """
+ Get the specified job group.
+ A job group is a job that is executed from a specific node in a flow.
+ API documentation
https://clouddataprep.com/documentation/api#section/Overview
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+ :param job_group_id The ID of the job that will be requests
+ :type job_group_id: int
+ :param embed Comma-separated list of objects to pull in as part of the
response
+ :type embed: string
+ :param include_deleted if set to "true", will include deleted objects
+ :type include_deleted: bool
+ """
+
+ template_fields = ("job_group_id",)
+
+ @apply_defaults
+ def __init__(
+ self, *, job_group_id: int, embed: str, include_deleted: bool, **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+ self.job_group_id = job_group_id
+ self.embed = embed
+ self.include_deleted = include_deleted
+
+ def execute(self, context: Dict):
+ self.log.info("Fetching data for job with id: %d ...",
self.job_group_id)
+ hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+ response = hook.get_job_group(
+ job_group_id=self.job_group_id,
+ embed=self.embed,
+ include_deleted=self.include_deleted,
+ )
+ return response
+
+
+class DataprepRunJobGroupOperator(BaseOperator):
+ """
+ Create a ``jobGroup``, which launches the specified job as the
authenticated user.
+ This performs the same action as clicking on the Run Job button in the
application.
+ To get recipe_id please follow the Dataprep API documentation
+ https://clouddataprep.com/documentation/api#operation/runJobGroup
+
+ :param recipe_id: The identifier for the recipe you would like to run.
+ :type recipe_id: int
+ """
+
+ def __init__(self, *, body_request: dict, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.body_request = body_request
+
+ def execute(self, context: None):
+ self.log.info("Creating a job...")
+ hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
Review comment:
The connection should be configurable via operator interface
##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
def execute(self, context: Dict):
self.log.info("Fetching data for job with id: %d ...", self.job_id)
- hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+ hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
response = hook.get_jobs_for_job_group(job_id=self.job_id)
return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+ """
+ Get the specified job group.
+ A job group is a job that is executed from a specific node in a flow.
+ API documentation
https://clouddataprep.com/documentation/api#section/Overview
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+ :param job_group_id The ID of the job that will be requests
+ :type job_group_id: int
+ :param embed Comma-separated list of objects to pull in as part of the
response
+ :type embed: string
+ :param include_deleted if set to "true", will include deleted objects
+ :type include_deleted: bool
+ """
+
+ template_fields = ("job_group_id",)
+
+ @apply_defaults
+ def __init__(
+ self, *, job_group_id: int, embed: str, include_deleted: bool, **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+ self.job_group_id = job_group_id
+ self.embed = embed
+ self.include_deleted = include_deleted
+
+ def execute(self, context: Dict):
+ self.log.info("Fetching data for job with id: %d ...",
self.job_group_id)
+ hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
Review comment:
The connection should be configurable via operator interface
##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
def execute(self, context: Dict):
self.log.info("Fetching data for job with id: %d ...", self.job_id)
- hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+ hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
Review comment:
The connection should be configurable via operator interface
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]