michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477198132



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, 
GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(GoogleSystemTest):

Review comment:
       I talked about it with Kamil yesterday and he explained me everything. 

##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -51,6 +50,68 @@ def __init__(
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation 
https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_id The ID of the job that will be requests
+    :type job_id: int
+    :param embed Comma-separated list of objects to pull in as part of the 
response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(job_id=self.job_id, embed="", 
include_deleted=self.include_deleted)
+        return response
+
+
+class DataprepRunJobGroupOperator(BaseOperator):
+    """
+    Create a ``jobGroup``, which launches the specified job as the 
authenticated user.
+    This performs the same action as clicking on the Run Job button in the 
application.
+
+    :param recipe_id: to run a job, you just specify the recipe identifier 
(``wrangledDataset.id``).
+        If the job is successful, all defined outputs are generated,
+        as defined in the output object, publications, and ``writeSettings`` 
objects
+        associated with the recipe.
+        To identify the ``wrangledDataset Id``, select the recipe icon in the 
UI flow view and
+        take the id shown in the URL. e.g. if the URL is /flows/10?recipe=7,

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to