chamikaramj commented on code in PR #27444:
URL: https://github.com/apache/beam/pull/27444#discussion_r1260034538
##########
sdks/python/apache_beam/runners/direct/direct_runner.py:
##########
@@ -83,18 +83,18 @@ def accept(self, pipeline):
pipeline.visit(self)
return self.supported_by_fnapi_runner
+ def enter_composite_transform(self, applied_ptransform):
+ # The FnApiRunner does not support streaming execution.
+ if isinstance(applied_ptransform.transform,
+ (ReadFromPubSub, WriteToPubSub)):
+ self.supported_by_fnapi_runner = False
Review Comment:
We don't need to set this for the source anymore ?
##########
sdks/python/apache_beam/runners/direct/direct_runner.py:
##########
@@ -83,18 +83,18 @@ def accept(self, pipeline):
pipeline.visit(self)
return self.supported_by_fnapi_runner
+ def enter_composite_transform(self, applied_ptransform):
+ # The FnApiRunner does not support streaming execution.
+ if isinstance(applied_ptransform.transform,
+ (ReadFromPubSub, WriteToPubSub)):
+ self.supported_by_fnapi_runner = False
+
def visit_transform(self, applied_ptransform):
transform = applied_ptransform.transform
+ print(applied_ptransform, transform)
Review Comment:
Delete print ?
##########
sdks/python/apache_beam/pipeline_test.py:
##########
@@ -61,39 +60,9 @@
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import MIN_TIMESTAMP
-# TODO(BEAM-1555): Test is failing on the service, with FakeSource.
-
-class FakeSource(NativeSource):
- """Fake source returning a fixed list of values."""
- class _Reader(object):
- def __init__(self, vals):
- self._vals = vals
- self._output_counter = Metrics.counter('main', 'outputs')
-
- def __enter__(self):
- return self
-
- def __exit__(self, exception_type, exception_value, traceback):
- pass
-
- def __iter__(self):
- for v in self._vals:
- self._output_counter.inc()
- yield v
-
- def __init__(self, vals):
- self._vals = vals
-
- def reader(self):
- return FakeSource._Reader(self._vals)
-
-
-class FakeUnboundedSource(NativeSource):
+class FakeUnboundedSource(SourceBase):
Review Comment:
Probably good to update CHANGES.md since we are deleting some public APIs
here (though we don't expect anyone other than Dataflow to have used these).
##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -261,6 +261,7 @@ def __init__(
timestamp_attribute=timestamp_attribute)
def expand(self, pvalue):
+ # TODO(BEAM-27443): Apply a proper transform rather than Read.
Review Comment:
Also, is there a reason we cannot do this refactoring here ? (just for my
information).
##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -261,6 +261,7 @@ def __init__(
timestamp_attribute=timestamp_attribute)
def expand(self, pvalue):
+ # TODO(BEAM-27443): Apply a proper transform rather than Read.
Review Comment:
Nit: `TODO(https://github.com/apache/beam/issues/27443)` might be better so
that we don't have to do URL modification to find the correct link.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]