Imbruced commented on code in PR #41122:
URL: https://github.com/apache/airflow/pull/41122#discussion_r1705417934


##########
airflow/providers/airbyte/hooks/airbyte.py:
##########
@@ -155,105 +130,59 @@ def wait_for_job(self, job_id: str | int, wait_seconds: 
float = 3, timeout: floa
                 raise AirflowException(f"Timeout: Airbyte job {job_id} is not 
ready after {timeout}s")
             time.sleep(wait_seconds)
             try:
-                job = self.get_job(job_id=(int(job_id)))
-                if self.api_type == "config":
-                    state = job.json()["job"]["status"]
-                else:
-                    state = job.json()["status"]
+                job = self.get_job_details(job_id=(int(job_id)))
+                state = job.status
+
             except AirflowException as err:
                 self.log.info("Retrying. Airbyte API returned server error 
when waiting for job: %s", err)
                 continue
 
-            if state in (self.RUNNING, self.PENDING, self.INCOMPLETE):
+            if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, 
JobStatusEnum.INCOMPLETE):
                 continue
-            if state == self.SUCCEEDED:
+            if state == JobStatusEnum.SUCCEEDED:
                 break
-            if state == self.ERROR:
+            if state == JobStatusEnum.FAILED:
                 raise AirflowException(f"Job failed:\n{job}")
-            elif state == self.CANCELLED:
+            elif state == JobStatusEnum.CANCELLED:
                 raise AirflowException(f"Job was cancelled:\n{job}")
             else:
                 raise AirflowException(f"Encountered unexpected state 
`{state}` for job_id `{job_id}`")
 
     def submit_sync_connection(self, connection_id: str) -> Any:
-        """
-        Submit a job to a Airbyte server.
-
-        :param connection_id: Required. The ConnectionId of the Airbyte 
Connection.
-        """
-        if self.api_type == "config":
-            return self.run(
-                endpoint=f"api/{self.api_version}/connections/sync",
-                json={"connectionId": connection_id},
-                headers={"accept": "application/json"},
-            )
-        else:
-            conn = self.get_connection(self.http_conn_id)
-            self.method = "POST"
-            return self.run(
-                endpoint=f"{self.api_version}/jobs",
-                headers={"accept": "application/json", "authorization": 
f"Bearer {conn.password}"},
-                json={
-                    "jobType": "sync",
-                    "connectionId": connection_id,
-                },  # TODO: add an option to pass jobType = reset
-            )
-
-    def get_job(self, job_id: int) -> Any:
-        """
-        Get the resource representation for a job in Airbyte.
-
-        :param job_id: Required. Id of the Airbyte job
-        """
-        if self.api_type == "config":
-            return self.run(
-                endpoint=f"api/{self.api_version}/jobs/get",
-                json={"id": job_id},
-                headers={"accept": "application/json"},
-            )
-        else:
-            self.method = "GET"
-            conn = self.get_connection(self.http_conn_id)
-            return self.run(
-                endpoint=f"{self.api_version}/jobs/{job_id}",
-                headers={"accept": "application/json", "authorization": 
f"Bearer {conn.password}"},
+        try:
+            res = self.airbyte_api.jobs.create_job(
+                request=JobCreateRequest(
+                    connection_id=connection_id,
+                    job_type=JobTypeEnum.SYNC,
+                )
             )
+            return res.job_response
+        except Exception as e:
+            raise AirflowException(str(e))

Review Comment:
   nit: here you pass exception directly 
   
https://github.com/apache/airflow/pull/41122/files#diff-f4527df370ba3e14e4d474da1a3698416349f915d96537f13856e59b86b71230R104



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to