This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3c93f7d17ba Feature/add warnings to file watchers (#28206)
3c93f7d17ba is described below
commit 3c93f7d17ba2f1b7e7324dbad441bdb241b1e724
Author: johnjcasey <[email protected]>
AuthorDate: Fri Sep 1 10:00:55 2023 -0400
Feature/add warnings to file watchers (#28206)
* Update 2.50 release notes to include new Kafka topicPattern feature
* Add warning for using match continuously
* add match continuously warning to python as well
* fix lint errors
* fix formatting again
---
.../core/src/main/java/org/apache/beam/sdk/io/FileIO.java | 9 +++++++++
sdks/python/apache_beam/io/fileio.py | 12 ++++++++++++
2 files changed, 21 insertions(+)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 5f319706e9e..2d28279f90b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -512,9 +512,18 @@ public class FileIO {
* the watching frequency given by the {@code interval}. The pipeline will
throw a {@code
* RuntimeError} if timestamp extraction for the matched file has failed,
suggesting the
* timestamp metadata is not available with the IO connector.
+ *
+ * <p>Matching continuously scales poorly, as it is stateful, and requires
storing file ids in
+ * memory. In addition, because it is memory-only, if a pipeline is
restarted, already processed
+ * files will be reprocessed. Consider an alternate technique, such as <a
+ *
href="https://cloud.google.com/storage/docs/pubsub-notifications">Pub/Sub
Notifications</a>
+ * when using GCS if possible.
*/
public MatchConfiguration continuously(
Duration interval, TerminationCondition<String, ?> condition, boolean
matchUpdatedFiles) {
+ LOG.warn(
+ "Matching Continuously is stateful, and can scale poorly. Consider
using Pub/Sub "
+ + "Notifications
(https://cloud.google.com/storage/docs/pubsub-notifications) if possible");
return toBuilder()
.setWatchInterval(interval)
.setWatchTerminationCondition(condition)
diff --git a/sdks/python/apache_beam/io/fileio.py
b/sdks/python/apache_beam/io/fileio.py
index a9297ca3d7a..23e979b44ca 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -266,6 +266,13 @@ class MatchContinuously(beam.PTransform):
MatchContinuously is experimental. No backwards-compatibility
guarantees.
+
+ Matching continuously scales poorly, as it is stateful, and requires storing
+ file ids in memory. In addition, because it is memory-only, if a pipeline is
+ restarted, already processed files will be reprocessed. Consider an alternate
+ technique, such as Pub/Sub Notifications
+ (https://cloud.google.com/storage/docs/pubsub-notifications)
+ when using GCS if possible.
"""
def __init__(
self,
@@ -299,6 +306,11 @@ class MatchContinuously(beam.PTransform):
self.match_upd = match_updated_files
self.apply_windowing = apply_windowing
self.empty_match_treatment = empty_match_treatment
+ _LOGGER.warning(
+ 'Matching Continuously is stateful, and can scale poorly. '
+ 'Consider using Pub/Sub Notifications '
+ '(https://cloud.google.com/storage/docs/pubsub-notifications) '
+ 'if possible')
def expand(self, pbegin) -> beam.PCollection[filesystem.FileMetadata]:
# invoke periodic impulse