amoghrajesh commented on code in PR #67118:
URL: https://github.com/apache/airflow/pull/67118#discussion_r3260629059
##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -198,8 +221,63 @@ def execute(self, context: Context) -> None:
self.conf =
inject_transport_information_into_spark_properties(self.conf, context)
if self._hook is None:
self._hook = self._get_hook()
+ if self._hook._should_track_driver_status:
+ return self.execute_resumable(context)
Review Comment:
Yes, this is a textbook deferrable case. This PR is specifically addressing
the synchronous operator case, which is a different problem from the deferrable
case.
For deferrable operators, the triggerer already handles continuity. The
failure mode I am attempting fixing here (worker dies mid-poll, driver ID lost,
retry resubmits a duplicate) does not apply to deferrable operators once they
have deferred.
The reason i am not using `defer()` here is that the deferrable path has its
own adoption barriers — it requires a separate Triggerer component, introduces
an async programming model that is a different mental model from standard sync
model, and has seen very low adoption in practice despite being available for
several years. Teams that write custom operators are known to write synchronous
ones. These are reasons to solve the sync case on its own terms rather than
requiring users to migrate to deferrable.
The two approaches are complementary, not competing. The `spark_job_id` that
we persist to `task_state` is intentionally compatible with a future deferrable
implementation — a `SparkSubmitTrigger` could read the same key from
`task_state` to reconnect before deferring. But thats a diff problem as I see it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]