razvanculea commented on code in PR #35137:
URL: https://github.com/apache/beam/pull/35137#discussion_r2137692786
##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -797,8 +802,65 @@ def pre_finalize(self, init_result, writer_results):
"""
raise NotImplementedError
+ def pre_finalize_windowed(self, init_result, writer_results, window=None):
+ """Pre-finalization stage for sink for unbounded PCollections.
+
+ Called after all bundle writes are complete and before finalize_write.
+ Used to setup and verify filesystem and sink states.
+
+ Args:
+ init_result: the result of ``initialize_write()`` invocation.
+ writer_results: an iterable containing results of ``Writer.close()``
+ invocations. This will only contain results of successful writes, and
+ will only contain the result of a single successful write for a given
+ bundle.
+ window: DoFn window
+
+ Returns:
+ An object that contains any sink specific state generated.
+ This object will be passed to finalize_windowed_write().
+ """
+ raise NotImplementedError
+
def finalize_write(self, init_result, writer_results, pre_finalize_result):
- """Finalizes the sink after all data is written to it.
+ """Finalizes the sink after all data is written to it. (batch)
+
+ Given the result of initialization and an iterable of results from bundle
+ writes, performs finalization after writing and closes the sink. Called
+ after all bundle writes are complete.
+
+ The bundle write results that are passed to finalize are those returned by
+ bundles that completed successfully. Although bundles may have been run
+ multiple times (for fault-tolerance), only one writer result will be passed
+ to finalize for each bundle. An implementation of finalize should perform
+ clean up of any failed and successfully retried bundles. Note that these
+ failed bundles will not have their writer result passed to finalize, so
+ finalize should be capable of locating any temporary/partial output written
+ by failed bundles.
+
+ If all retries of a bundle fails, the whole pipeline will fail *without*
+ finalize_write() being invoked.
+
+ A best practice is to make finalize atomic. If this is impossible given the
+ semantics of the sink, finalize should be idempotent, as it may be called
+ multiple times in the case of failure/retry or for redundancy.
+
+ Note that the iteration order of the writer results is not guaranteed to be
+ consistent if finalize is called multiple times.
+
+ Args:
+ init_result: the result of ``initialize_write()`` invocation.
+ writer_results: an iterable containing results of ``Writer.close()``
+ invocations. This will only contain results of successful writes, and
+ will only contain the result of a single successful write for a given
+ bundle.
+ pre_finalize_result: the result of ``pre_finalize()`` invocation.
+ """
+ raise NotImplementedError
+
+ def finalize_windowed_write(
+ self, init_result, writer_results, pre_finalize_result, w=None):
+ """Finalizes the sink after all data is written to it for a window.
Given the result of initialization and an iterable of results from bundle
Review Comment:
i agree , fixed as suggested
##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -830,6 +892,7 @@ def finalize_write(self, init_result, writer_results,
pre_finalize_result):
will only contain the result of a single successful write for a given
bundle.
pre_finalize_result: the result of ``pre_finalize()`` invocation.
+ w: DoFn window
Review Comment:
i agree , fixed as suggested
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]