This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b3f505e [BEAM-12437] Fix broken test from missing
allow_unsafe_triggers
new 5a029fd Merge pull request #14919 from y1chi/BEAM-12437
b3f505e is described below
commit b3f505e3226e19357f9f3eafeded655fac159662
Author: Yichi Zhang <[email protected]>
AuthorDate: Tue Jun 1 09:53:02 2021 -0700
[BEAM-12437] Fix broken test from missing allow_unsafe_triggers
---
sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py | 4 +++-
sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py | 2 ++
sdks/python/apache_beam/transforms/trigger_test.py | 2 ++
3 files changed, 7 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 9eb59b5..9672431 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -840,7 +840,9 @@ class BigQueryFileLoadsIT(unittest.TestCase):
data=[(i, ) for i in range(100)])
args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=all_of(state_matcher, bq_matcher), streaming=True)
+ on_success_matcher=all_of(state_matcher, bq_matcher),
+ streaming=True,
+ allow_unsafe_triggers=True)
with beam.Pipeline(argv=args) as p:
stream_source = (
TestStream().advance_watermark_to(0).advance_processing_time(
diff --git a/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py
b/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py
index 5186e09..1964082 100644
--- a/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py
+++ b/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py
@@ -102,6 +102,8 @@ def run_combine(pipeline, input_elements=5,
lift_combiners=True):
# Enable runtime type checking in order to cover TypeCheckCombineFn by
# the test.
pipeline.get_pipeline_options().view_as(TypeOptions).runtime_type_check =
True
+ pipeline.get_pipeline_options().view_as(
+ TypeOptions).allow_unsafe_triggers = True
with pipeline as p:
pcoll = p | 'Start' >> beam.Create(range(input_elements))
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py
b/sdks/python/apache_beam/transforms/trigger_test.py
index 9e1a569..ed43094 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -32,6 +32,7 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.portability import common_urns
from apache_beam.runners import pipeline_context
from apache_beam.runners.direct.clock import TestClock
@@ -1187,6 +1188,7 @@ class BaseTestStreamTranscriptTest(TranscriptTest):
with TestPipeline() as p:
# TODO(BEAM-8601): Pass this during pipeline construction.
p._options.view_as(StandardOptions).streaming = True
+ p._options.view_as(TypeOptions).allow_unsafe_triggers = True
# We can have at most one test stream per pipeline, so we share it.
inputs_and_expected = p | read_test_stream