eladkal commented on code in PR #34880:
URL: https://github.com/apache/airflow/pull/34880#discussion_r1356610199
##########
airflow/providers/google/cloud/hooks/dataprep.py:
##########
@@ -137,6 +137,20 @@ def run_job_group(self, body_request: dict) -> dict[str,
Any]:
self._raise_for_status(response)
return response.json()
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
max=10))
+ def create_flow(self, *, body_request: dict) -> dict:
+ """
+ Creates flow.
+
+ :param body_request: Body of the POST request to be sent.
+ For more details check
https://clouddataprep.com/documentation/api#operation/createFlow
+ """
+ endpoint = "/v4/flows"
Review Comment:
I don't know this API but please double check if this is what you want.
Changing this in the future means breaking change to the provider.
Some APIs are defaulted to latest version unless use specify specific
version that might be better solution here if the API supports it.
##########
airflow/providers/google/cloud/hooks/dataprep.py:
##########
@@ -205,3 +219,74 @@ def _raise_for_status(self, response:
requests.models.Response) -> None:
except HTTPError:
self.log.error(response.json().get("exception"))
raise
+
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
max=10))
+ def create_imported_dataset(self, *, body_request: dict) -> dict:
+ """
+ Creates imported dataset.
+
+ :param body_request: Body of the POST request to be sent.
+ For more details check
https://clouddataprep.com/documentation/api#operation/createImportedDataset
+ """
+ endpoint = "/v4/importedDatasets"
+ url: str = urljoin(self._base_url, endpoint)
+ response = requests.post(url, headers=self._headers,
data=json.dumps(body_request))
+ self._raise_for_status(response)
+ return response.json()
+
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
max=10))
+ def create_wrangled_dataset(self, *, body_request: dict) -> dict:
+ """
+ Creates wrangled dataset.
+
+ :param body_request: Body of the POST request to be sent.
+ For more details check
+
https://api.trifacta.com/dataprep-enterprise-cloud/index.html#tag/WrangledDataset
+ """
+ endpoint = "/v4/wrangledDatasets"
+ url: str = urljoin(self._base_url, endpoint)
+ response = requests.post(url, headers=self._headers,
data=json.dumps(body_request))
+ self._raise_for_status(response)
+ return response.json()
+
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
max=10))
+ def create_output_object(self, *, body_request: dict) -> dict:
+ """
+ Creates output.
+
+ :param body_request: Body of the POST request to be sent.
+ For more details check
+
https://api.trifacta.com/dataprep-premium/index.html#operation/createOutputObject
+ """
+ endpoint = "/v4/outputObjects"
+ url: str = urljoin(self._base_url, endpoint)
+ response = requests.post(url, headers=self._headers,
data=json.dumps(body_request))
+ self._raise_for_status(response)
+ return response.json()
+
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
max=10))
+ def create_write_settings(self, *, body_request: dict) -> dict:
+ """
+ Creates write settings.
+
+ :param body_request: Body of the POST request to be sent.
+ For more details check
+
https://api.trifacta.com/dataprep-premium/index.html#operation/createWriteSetting
Review Comment:
what is this link?
##########
airflow/providers/google/cloud/hooks/dataprep.py:
##########
@@ -137,6 +137,20 @@ def run_job_group(self, body_request: dict) -> dict[str,
Any]:
self._raise_for_status(response)
return response.json()
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
max=10))
+ def create_flow(self, *, body_request: dict) -> dict:
+ """
+ Creates flow.
+
+ :param body_request: Body of the POST request to be sent.
+ For more details check
https://clouddataprep.com/documentation/api#operation/createFlow
Review Comment:
what is this link? is this google domain?
We must use only official links
I know links are by `https://cloud.google.com/dataprep`
##########
airflow/providers/google/cloud/hooks/dataprep.py:
##########
@@ -205,3 +219,74 @@ def _raise_for_status(self, response:
requests.models.Response) -> None:
except HTTPError:
self.log.error(response.json().get("exception"))
raise
+
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
max=10))
+ def create_imported_dataset(self, *, body_request: dict) -> dict:
+ """
+ Creates imported dataset.
+
+ :param body_request: Body of the POST request to be sent.
+ For more details check
https://clouddataprep.com/documentation/api#operation/createImportedDataset
+ """
+ endpoint = "/v4/importedDatasets"
Review Comment:
If we are left with v4 lets define it globally rather than in each function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]