This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ca674aad086 Remove unneeded Dataflow Runner v1 code. (#27196)
ca674aad086 is described below
commit ca674aad086e4d1a40e8c01598073030a22484fa
Author: Robert Bradshaw <[email protected]>
AuthorDate: Mon Jul 10 16:31:55 2023 -0700
Remove unneeded Dataflow Runner v1 code. (#27196)
---
sdks/python/apache_beam/coders/coders.py | 133 ----
.../apache_beam/coders/coders_test_common.py | 41 -
sdks/python/apache_beam/coders/row_coder.py | 9 -
sdks/python/apache_beam/coders/row_coder_test.py | 11 -
.../runners/dataflow/dataflow_runner.py | 828 +--------------------
.../runners/dataflow/dataflow_runner_test.py | 369 +--------
.../runners/dataflow/internal/apiclient.py | 95 +--
.../runners/dataflow/internal/apiclient_test.py | 53 --
.../apache_beam/runners/dataflow/internal/names.py | 76 --
.../runners/dataflow/ptransform_overrides.py | 126 ----
10 files changed, 49 insertions(+), 1692 deletions(-)
diff --git a/sdks/python/apache_beam/coders/coders.py
b/sdks/python/apache_beam/coders/coders.py
index d4ca99b80fb..7c5c8e09303 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -280,29 +280,6 @@ class Coder(object):
# refined in user-defined Coders.
return []
- def as_cloud_object(self, coders_context=None):
- """For internal use only; no backwards-compatibility guarantees.
-
- Returns Google Cloud Dataflow API description of this coder."""
- # This is an internal detail of the Coder API and does not need to be
- # refined in user-defined Coders.
-
- value = {
- # We pass coders in the form "<coder_name>$<pickled_data>" to make the
- # job description JSON more readable. Data before the $ is ignored by
- # the worker.
- '@type': serialize_coder(self),
- 'component_encodings': [
- component.as_cloud_object(coders_context)
- for component in self._get_component_coders()
- ],
- }
-
- if coders_context:
- value['pipeline_proto_coder_id'] = coders_context.get_id(self)
-
- return value
-
def __repr__(self):
return self.__class__.__name__
@@ -493,11 +470,6 @@ class BytesCoder(FastCoder):
def to_type_hint(self):
return bytes
- def as_cloud_object(self, coders_context=None):
- return {
- '@type': 'kind:bytes',
- }
-
def __eq__(self, other):
return type(self) == type(other)
@@ -667,11 +639,6 @@ class VarIntCoder(FastCoder):
def to_type_hint(self):
return int
- def as_cloud_object(self, coders_context=None):
- return {
- '@type': 'kind:varint',
- }
-
def __eq__(self, other):
return type(self) == type(other)
@@ -846,21 +813,6 @@ class _PickleCoderBase(FastCoder):
# GroupByKey operations.
return False
- def as_cloud_object(self, coders_context=None, is_pair_like=True):
- value = super().as_cloud_object(coders_context)
- # We currently use this coder in places where we cannot infer the coder to
- # use for the value type in a more granular way. In places where the
- # service expects a pair, it checks for the "is_pair_like" key, in which
- # case we would fail without the hack below.
- if is_pair_like:
- value['is_pair_like'] = True
- value['component_encodings'] = [
- self.as_cloud_object(coders_context, is_pair_like=False),
- self.as_cloud_object(coders_context, is_pair_like=False)
- ]
-
- return value
-
# We allow .key_coder() and .value_coder() to be called on PickleCoder since
# we can't always infer the return values of lambdas in ParDo operations, the
# result of which may be used in a GroupBykey.
@@ -983,21 +935,6 @@ class FastPrimitivesCoder(FastCoder):
def to_type_hint(self):
return Any
- def as_cloud_object(self, coders_context=None, is_pair_like=True):
- value = super().as_cloud_object(coders_context)
- # We currently use this coder in places where we cannot infer the coder to
- # use for the value type in a more granular way. In places where the
- # service expects a pair, it checks for the "is_pair_like" key, in which
- # case we would fail without the hack below.
- if is_pair_like:
- value['is_pair_like'] = True
- value['component_encodings'] = [
- self.as_cloud_object(coders_context, is_pair_like=False),
- self.as_cloud_object(coders_context, is_pair_like=False)
- ]
-
- return value
-
# We allow .key_coder() and .value_coder() to be called on
FastPrimitivesCoder
# since we can't always infer the return values of lambdas in ParDo
# operations, the result of which may be used in a GroupBykey.
@@ -1231,19 +1168,6 @@ class TupleCoder(FastCoder):
# type: (typehints.TupleConstraint, CoderRegistry) -> TupleCoder
return cls([registry.get_coder(t) for t in typehint.tuple_types])
- def as_cloud_object(self, coders_context=None):
- if self.is_kv_coder():
- return {
- '@type': 'kind:pair',
- 'is_pair_like': True,
- 'component_encodings': [
- component.as_cloud_object(coders_context)
- for component in self._get_component_coders()
- ],
- }
-
- return super().as_cloud_object(coders_context)
-
def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return self.coders()
@@ -1353,15 +1277,6 @@ class ListLikeCoder(FastCoder):
return type(self)(
self._elem_coder.as_deterministic_coder(step_label, error_message))
- def as_cloud_object(self, coders_context=None):
- return {
- '@type': 'kind:stream',
- 'is_stream_like': True,
- 'component_encodings': [
- self._elem_coder.as_cloud_object(coders_context)
- ],
- }
-
def value_coder(self):
return self._elem_coder
@@ -1409,11 +1324,6 @@ class GlobalWindowCoder(SingletonCoder):
from apache_beam.transforms import window
super().__init__(window.GlobalWindow())
- def as_cloud_object(self, coders_context=None):
- return {
- '@type': 'kind:global_window',
- }
-
Coder.register_structured_urn(
common_urns.coders.GLOBAL_WINDOW.urn, GlobalWindowCoder)
@@ -1428,11 +1338,6 @@ class IntervalWindowCoder(FastCoder):
# type: () -> bool
return True
- def as_cloud_object(self, coders_context=None):
- return {
- '@type': 'kind:interval_window',
- }
-
def __eq__(self, other):
return type(self) == type(other)
@@ -1466,16 +1371,6 @@ class WindowedValueCoder(FastCoder):
c.is_deterministic() for c in
[self.wrapped_value_coder, self.timestamp_coder, self.window_coder])
- def as_cloud_object(self, coders_context=None):
- return {
- '@type': 'kind:windowed_value',
- 'is_wrapper': True,
- 'component_encodings': [
- component.as_cloud_object(coders_context)
- for component in self._get_component_coders()
- ],
- }
-
def _get_component_coders(self):
# type: () -> List[Coder]
return [self.wrapped_value_coder, self.window_coder]
@@ -1527,10 +1422,6 @@ class ParamWindowedValueCoder(WindowedValueCoder):
# type: () -> bool
return self.wrapped_value_coder.is_deterministic()
- def as_cloud_object(self, coders_context=None):
- raise NotImplementedError(
- "as_cloud_object not supported for ParamWindowedValueCoder")
-
def __repr__(self):
return 'ParamWindowedValueCoder[%s]' % self.wrapped_value_coder
@@ -1577,14 +1468,6 @@ class LengthPrefixCoder(FastCoder):
def value_coder(self):
return self._value_coder
- def as_cloud_object(self, coders_context=None):
- return {
- '@type': 'kind:length_prefix',
- 'component_encodings': [
- self._value_coder.as_cloud_object(coders_context)
- ],
- }
-
def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return (self._value_coder, )
@@ -1680,14 +1563,6 @@ class ShardedKeyCoder(FastCoder):
# type: () -> bool
return self._key_coder.is_deterministic()
- def as_cloud_object(self, coders_context=None):
- return {
- '@type': 'kind:sharded_key',
- 'component_encodings': [
- self._key_coder.as_cloud_object(coders_context)
- ],
- }
-
def to_type_hint(self):
from apache_beam.typehints import sharded_key_type
return sharded_key_type.ShardedKeyTypeConstraint(
@@ -1738,14 +1613,6 @@ class TimestampPrefixingWindowCoder(FastCoder):
def is_deterministic(self) -> bool:
return self._window_coder.is_deterministic()
- def as_cloud_object(self, coders_context=None):
- return {
- '@type': 'kind:custom_window',
- 'component_encodings': [
- self._window_coder.as_cloud_object(coders_context)
- ],
- }
-
def __repr__(self):
return 'TimestampPrefixingWindowCoder[%r]' % self._window_coder
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py
b/sdks/python/apache_beam/coders/coders_test_common.py
index 8b6674aebec..70582e7992a 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -387,16 +387,6 @@ class CodersTest(unittest.TestCase):
def test_tuple_coder(self):
kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
- # Verify cloud object representation
- self.assertEqual({
- '@type': 'kind:pair',
- 'is_pair_like': True,
- 'component_encodings': [
- coders.VarIntCoder().as_cloud_object(),
- coders.BytesCoder().as_cloud_object()
- ],
- },
- kv_coder.as_cloud_object())
# Test binary representation
self.assertEqual(b'\x04abc', kv_coder.encode((4, b'abc')))
# Test unnested
@@ -424,13 +414,6 @@ class CodersTest(unittest.TestCase):
def test_iterable_coder(self):
iterable_coder = coders.IterableCoder(coders.VarIntCoder())
- # Verify cloud object representation
- self.assertEqual({
- '@type': 'kind:stream',
- 'is_stream_like': True,
- 'component_encodings': [coders.VarIntCoder().as_cloud_object()]
- },
- iterable_coder.as_cloud_object())
# Test unnested
self.check_coder(iterable_coder, [1], [-1, 0, 100])
# Test nested
@@ -507,16 +490,6 @@ class CodersTest(unittest.TestCase):
def test_windowed_value_coder(self):
coder = coders.WindowedValueCoder(
coders.VarIntCoder(), coders.GlobalWindowCoder())
- # Verify cloud object representation
- self.assertEqual({
- '@type': 'kind:windowed_value',
- 'is_wrapper': True,
- 'component_encodings': [
- coders.VarIntCoder().as_cloud_object(),
- coders.GlobalWindowCoder().as_cloud_object(),
- ],
- },
- coder.as_cloud_object())
# Test binary representation
self.assertEqual(
b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01',
@@ -618,8 +591,6 @@ class CodersTest(unittest.TestCase):
def test_global_window_coder(self):
coder = coders.GlobalWindowCoder()
value = window.GlobalWindow()
- # Verify cloud object representation
- self.assertEqual({'@type': 'kind:global_window'}, coder.as_cloud_object())
# Test binary representation
self.assertEqual(b'', coder.encode(value))
self.assertEqual(value, coder.decode(b''))
@@ -630,12 +601,6 @@ class CodersTest(unittest.TestCase):
def test_length_prefix_coder(self):
coder = coders.LengthPrefixCoder(coders.BytesCoder())
- # Verify cloud object representation
- self.assertEqual({
- '@type': 'kind:length_prefix',
- 'component_encodings': [coders.BytesCoder().as_cloud_object()]
- },
- coder.as_cloud_object())
# Test binary representation
self.assertEqual(b'\x00', coder.encode(b''))
self.assertEqual(b'\x01a', coder.encode(b'a'))
@@ -725,12 +690,6 @@ class CodersTest(unittest.TestCase):
for key, bytes_repr, key_coder in key_and_coders:
coder = coders.ShardedKeyCoder(key_coder)
- # Verify cloud object representation
- self.assertEqual({
- '@type': 'kind:sharded_key',
- 'component_encodings': [key_coder.as_cloud_object()]
- },
- coder.as_cloud_object())
# Test str repr
self.assertEqual('%s' % coder, 'ShardedKeyCoder[%s]' % key_coder)
diff --git a/sdks/python/apache_beam/coders/row_coder.py
b/sdks/python/apache_beam/coders/row_coder.py
index 19424fa1f12..7765ccebc26 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -17,8 +17,6 @@
# pytype: skip-file
-from google.protobuf import json_format
-
from apache_beam.coders import typecoders
from apache_beam.coders.coder_impl import LogicalTypeCoderImpl
from apache_beam.coders.coder_impl import RowCoderImpl
@@ -91,13 +89,6 @@ class RowCoder(FastCoder):
def to_type_hint(self):
return self._type_hint
- def as_cloud_object(self, coders_context=None):
- value = super().as_cloud_object(coders_context)
-
- value['schema'] = json_format.MessageToJson(self.schema).encode('utf-8')
-
- return value
-
def __hash__(self):
return hash(self.schema.SerializeToString())
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py
b/sdks/python/apache_beam/coders/row_coder_test.py
index dbca3e7f69c..6ac982835cb 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -22,7 +22,6 @@ import unittest
from itertools import chain
import numpy as np
-from google.protobuf import json_format
from numpy.testing import assert_array_equal
import apache_beam as beam
@@ -398,16 +397,6 @@ class RowCoderTest(unittest.TestCase):
self.assertRaisesRegex(
ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))
- def test_row_coder_cloud_object_schema(self):
- schema_proto = schema_pb2.Schema(id='some-cloud-object-schema')
- schema_proto_json = json_format.MessageToJson(schema_proto).encode('utf-8')
-
- coder = RowCoder(schema_proto)
-
- cloud_object = coder.as_cloud_object()
-
- self.assertEqual(schema_proto_json, cloud_object['schema'])
-
def test_batch_encode_decode(self):
coder = RowCoder(typing_to_runner_api(Person).row_type.schema).get_impl()
seq_out = coder_impl.create_OutputStream()
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 674d05c64ec..315dc8ff700 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -22,26 +22,18 @@ to the Dataflow Service for remote execution by a worker.
"""
# pytype: skip-file
-import base64
import logging
import os
import threading
import time
-import traceback
import warnings
from collections import defaultdict
from subprocess import DEVNULL
from typing import TYPE_CHECKING
from typing import List
-from urllib.parse import quote
-from urllib.parse import quote_from_bytes
-from urllib.parse import unquote_to_bytes
import apache_beam as beam
from apache_beam import coders
-from apache_beam import error
-from apache_beam.internal import pickler
-from apache_beam.internal.gcp import json_value
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions
@@ -51,23 +43,14 @@ from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.pvalue import AsSideInput
from apache_beam.runners.common import DoFnSignature
from apache_beam.runners.common import group_by_key_input_visitor
-from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow as
dataflow_api
-from apache_beam.runners.dataflow.internal.names import PropertyNames
-from apache_beam.runners.dataflow.internal.names import TransformNames
from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PValueCache
-from apache_beam.transforms import window
-from apache_beam.transforms.display import DisplayData
-from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX
from apache_beam.typehints import typehints
from apache_beam.utils import processes
-from apache_beam.utils import proto_utils
from apache_beam.utils.interactive_utils import is_in_notebook
from apache_beam.utils.plugin import BeamPlugin
@@ -103,9 +86,6 @@ class DataflowRunner(PipelineRunner):
# Imported here to avoid circular dependencies.
# TODO: Remove the apache_beam.pipeline dependency in
CreatePTransformOverride
- from apache_beam.runners.dataflow.ptransform_overrides import
CombineValuesPTransformOverride
- from apache_beam.runners.dataflow.ptransform_overrides import
CreatePTransformOverride
- from apache_beam.runners.dataflow.ptransform_overrides import
ReadPTransformOverride
from apache_beam.runners.dataflow.ptransform_overrides import
NativeReadPTransformOverride
# These overrides should be applied before the proto representation of the
@@ -114,19 +94,7 @@ class DataflowRunner(PipelineRunner):
NativeReadPTransformOverride(),
] # type: List[PTransformOverride]
- # These overrides should be applied after the proto representation of the
- # graph is created.
- _NON_PORTABLE_PTRANSFORM_OVERRIDES = [
- CombineValuesPTransformOverride(),
- CreatePTransformOverride(),
- ReadPTransformOverride(),
- ] # type: List[PTransformOverride]
-
def __init__(self, cache=None):
- # Cache of CloudWorkflowStep protos generated while the runner
- # "executes" a pipeline.
- self._cache = cache if cache is not None else PValueCache()
- self._unique_step_id = 0
self._default_environment = None
def is_fnapi_compatible(self):
@@ -136,10 +104,6 @@ class DataflowRunner(PipelineRunner):
_check_and_add_missing_options(options)
return super().apply(transform, input, options)
- def _get_unique_step_name(self):
- self._unique_step_id += 1
- return 's%s' % self._unique_step_id
-
@staticmethod
def poll_for_job_completion(
runner, result, duration, state_update_callback=None):
@@ -262,7 +226,7 @@ class DataflowRunner(PipelineRunner):
return element
@staticmethod
- def side_input_visitor(is_runner_v2=False, deterministic_key_coders=True):
+ def side_input_visitor(deterministic_key_coders=True):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.pipeline import PipelineVisitor
@@ -300,9 +264,8 @@ class DataflowRunner(PipelineRunner):
'Unsupported access pattern for %r: %r' %
(transform_node.full_label, access_pattern))
new_side_inputs.append(new_side_input)
- if is_runner_v2:
- transform_node.side_inputs = new_side_inputs
- transform_node.transform.side_inputs = new_side_inputs
+ transform_node.side_inputs = new_side_inputs
+ transform_node.transform.side_inputs = new_side_inputs
return SideInputVisitor()
@@ -363,20 +326,12 @@ class DataflowRunner(PipelineRunner):
not pipeline._options.view_as(
TypeOptions).allow_non_deterministic_key_coders))
- def _check_for_unsupported_features_on_non_portable_worker(self, pipeline):
- pipeline.visit(self.combinefn_visitor())
-
def run_pipeline(self, pipeline, options, pipeline_proto=None):
"""Remotely executes entire pipeline or parts reachable from node."""
if _is_runner_v2_disabled(options):
- debug_options = options.view_as(DebugOptions)
- if not debug_options.lookup_experiment('disable_runner_v2_until_v2.50'):
- raise ValueError(
- 'disable_runner_v2 is deprecated in Beam Python ' +
- beam.version.__version__ +
- ' and this execution mode will be removed in a future Beam SDK. '
- 'If needed, please use: '
- '"--experiments=disable_runner_v2_until_v2.50".')
+ raise ValueError(
+ 'Disabling Runner V2 no longer supported '
+ 'using Beam Python %s.' % beam.version.__version__)
# Label goog-dataflow-notebook if job is started from notebook.
if is_in_notebook():
@@ -397,26 +352,12 @@ class DataflowRunner(PipelineRunner):
'Google Cloud Dataflow runner not available, '
'please install apache_beam[gcp]')
- if pipeline_proto or pipeline.contains_external_transforms:
- if _is_runner_v2_disabled(options):
- raise ValueError(
- 'This pipeline contains cross language transforms, '
- 'which requires Runner V2.')
- if not _is_runner_v2(options):
- _LOGGER.info(
- 'Automatically enabling Dataflow Runner V2 since the '
- 'pipeline used cross-language transforms.')
- _add_runner_v2_missing_options(options)
-
- is_runner_v2 = _is_runner_v2(options)
- if not is_runner_v2:
- self._check_for_unsupported_features_on_non_portable_worker(pipeline)
-
# Convert all side inputs into a form acceptable to Dataflow.
if pipeline:
+ pipeline.visit(self.combinefn_visitor())
+
pipeline.visit(
self.side_input_visitor(
- _is_runner_v2(options),
deterministic_key_coders=not options.view_as(
TypeOptions).allow_non_deterministic_key_coders))
@@ -430,10 +371,6 @@ class DataflowRunner(PipelineRunner):
"Native sinks no longer implemented; "
"ignoring use_legacy_bq_sink.")
- from apache_beam.runners.dataflow.ptransform_overrides import
GroupIntoBatchesWithShardedKeyPTransformOverride
- pipeline.replace_all(
- [GroupIntoBatchesWithShardedKeyPTransformOverride(self, options)])
-
if pipeline_proto:
self.proto_pipeline = pipeline_proto
@@ -449,7 +386,7 @@ class DataflowRunner(PipelineRunner):
self._default_environment.container_image)
else:
artifacts = environments.python_sdk_dependencies(options)
- if artifacts and _is_runner_v2(options):
+ if artifacts:
_LOGGER.info(
"Pipeline has additional dependencies to be installed "
"in SDK worker container, consider using the SDK "
@@ -501,11 +438,6 @@ class DataflowRunner(PipelineRunner):
known_runner_urns=frozenset(),
partial=True)
- if not is_runner_v2:
- # Performing configured PTransform overrides which should not be
reflected
- # in the proto representation of the graph.
- pipeline.replace_all(DataflowRunner._NON_PORTABLE_PTRANSFORM_OVERRIDES)
-
# Add setup_options for all the BeamPlugin imports
setup_options = options.view_as(SetupOptions)
plugins = BeamPlugin.get_all_plugin_paths()
@@ -523,16 +455,6 @@ class DataflowRunner(PipelineRunner):
self.job = apiclient.Job(options, self.proto_pipeline)
- # TODO: Consider skipping these for all use_portable_job_submission jobs.
- if pipeline:
- # Dataflow Runner v1 requires output type of the Flatten to be the same
as
- # the inputs, hence we enforce that here. Dataflow Runner v2 does not
- # require this.
- pipeline.visit(self.flatten_input_visitor())
-
- # Trigger a traversal of all reachable nodes.
- self.visit_transforms(pipeline, options)
-
test_options = options.view_as(TestOptions)
# If it is a dry run, return without submitting the job.
if test_options.dry_run:
@@ -557,11 +479,6 @@ class DataflowRunner(PipelineRunner):
result.metric_results = self._metrics
return result
- def _get_typehint_based_encoding(self, typehint, window_coder):
- """Returns an encoding based on a typehint object."""
- return self._get_cloud_encoding(
- self._get_coder(typehint, window_coder=window_coder))
-
@staticmethod
def _get_coder(typehint, window_coder):
"""Returns a coder based on a typehint object."""
@@ -570,197 +487,6 @@ class DataflowRunner(PipelineRunner):
coders.registry.get_coder(typehint), window_coder=window_coder)
return coders.registry.get_coder(typehint)
- def _get_cloud_encoding(self, coder, unused=None):
- """Returns an encoding based on a coder object."""
- if not isinstance(coder, coders.Coder):
- raise TypeError(
- 'Coder object must inherit from coders.Coder: %s.' % str(coder))
- return coder.as_cloud_object(self.proto_context.coders)
-
- def _get_side_input_encoding(self, input_encoding):
- """Returns an encoding for the output of a view transform.
-
- Args:
- input_encoding: encoding of current transform's input. Side inputs need
- this because the service will check that input and output types match.
-
- Returns:
- An encoding that matches the output and input encoding. This is essential
- for the View transforms introduced to produce side inputs to a ParDo.
- """
- return {
- '@type': 'kind:stream',
- 'component_encodings': [input_encoding],
- 'is_stream_like': {
- 'value': True
- },
- }
-
- def _get_encoded_output_coder(
- self, transform_node, window_value=True, output_tag=None):
- """Returns the cloud encoding of the coder for the output of a
transform."""
-
- if output_tag in transform_node.outputs:
- element_type = transform_node.outputs[output_tag].element_type
- elif len(transform_node.outputs) == 1:
- output_tag = DataflowRunner._only_element(transform_node.outputs.keys())
- # TODO(robertwb): Handle type hints for multi-output transforms.
- element_type = transform_node.outputs[output_tag].element_type
-
- else:
- # TODO(silviuc): Remove this branch (and assert) when typehints are
- # propagated everywhere. Returning an 'Any' as type hint will trigger
- # usage of the fallback coder (i.e., cPickler).
- element_type = typehints.Any
- if window_value:
- # All outputs have the same windowing. So getting the coder from an
- # arbitrary window is fine.
- output_tag = next(iter(transform_node.outputs.keys()))
- window_coder = (
- transform_node.outputs[output_tag].windowing.windowfn.
- get_window_coder())
- else:
- window_coder = None
- return self._get_typehint_based_encoding(element_type, window_coder)
-
- def get_pcoll_with_auto_sharding(self):
- if not hasattr(self, '_pcoll_with_auto_sharding'):
- return set()
- return self._pcoll_with_auto_sharding
-
- def add_pcoll_with_auto_sharding(self, applied_ptransform):
- if not hasattr(self, '_pcoll_with_auto_sharding'):
- self.__setattr__('_pcoll_with_auto_sharding', set())
- output = DataflowRunner._only_element(applied_ptransform.outputs.keys())
- self._pcoll_with_auto_sharding.add(
- applied_ptransform.outputs[output]._unique_name())
-
- def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
- """Creates a Step object and adds it to the cache."""
- # Import here to avoid adding the dependency for local running scenarios.
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.runners.dataflow.internal import apiclient
- step = apiclient.Step(step_kind, self._get_unique_step_name())
- self.job.proto.steps.append(step.proto)
- step.add_property(PropertyNames.USER_NAME, step_label)
- # Cache the node/step association for the main output of the transform
node.
-
- # External transforms may not use 'None' as an output tag.
- output_tags = ([None] +
- list(side_tags) if None in transform_node.outputs.keys()
else
- list(transform_node.outputs.keys()))
-
- # We have to cache output for all tags since some transforms may produce
- # multiple outputs.
- for output_tag in output_tags:
- self._cache.cache_output(transform_node, output_tag, step)
-
- # Finally, we add the display data items to the pipeline step.
- # If the transform contains no display data then an empty list is added.
- step.add_property(
- PropertyNames.DISPLAY_DATA,
- [
- item.get_dict()
- for item in DisplayData.create_from(transform_node.transform).items
- ])
-
- if transform_node.resource_hints:
- step.add_property(
- PropertyNames.RESOURCE_HINTS,
- {
- hint: quote_from_bytes(value)
- for (hint, value) in transform_node.resource_hints.items()
- })
-
- return step
-
- def _add_singleton_step(
- self,
- label,
- full_label,
- tag,
- input_step,
- windowing_strategy,
- access_pattern):
- """Creates a CollectionToSingleton step used to handle ParDo side
inputs."""
- # Import here to avoid adding the dependency for local running scenarios.
- from apache_beam.runners.dataflow.internal import apiclient
- step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label)
- self.job.proto.steps.append(step.proto)
- step.add_property(PropertyNames.USER_NAME, full_label)
- step.add_property(
- PropertyNames.PARALLEL_INPUT,
- {
- '@type': 'OutputReference',
- PropertyNames.STEP_NAME: input_step.proto.name,
- PropertyNames.OUTPUT_NAME: input_step.get_output(tag)
- })
- step.encoding = self._get_side_input_encoding(input_step.encoding)
-
- output_info = {
- PropertyNames.USER_NAME: '%s.%s' % (full_label, PropertyNames.OUTPUT),
- PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT
- }
- if common_urns.side_inputs.MULTIMAP.urn == access_pattern:
- output_info[PropertyNames.USE_INDEXED_FORMAT] = True
- step.add_property(PropertyNames.OUTPUT_INFO, [output_info])
-
- step.add_property(
- PropertyNames.WINDOWING_STRATEGY,
- self.serialize_windowing_strategy(
- windowing_strategy, self._default_environment))
- return step
-
- def run_Impulse(self, transform_node, options):
- step = self._add_step(
- TransformNames.READ, transform_node.full_label, transform_node)
- step.add_property(PropertyNames.FORMAT, 'impulse')
- encoded_impulse_element = coders.WindowedValueCoder(
- coders.BytesCoder(),
- coders.coders.GlobalWindowCoder()).get_impl().encode_nested(
- window.GlobalWindows.windowed_value(b''))
- if _is_runner_v2(options):
- encoded_impulse_as_str = self.byte_array_to_json_string(
- encoded_impulse_element)
- else:
- encoded_impulse_as_str =
base64.b64encode(encoded_impulse_element).decode(
- 'ascii')
-
- step.add_property(PropertyNames.IMPULSE_ELEMENT, encoded_impulse_as_str)
-
- step.encoding = self._get_encoded_output_coder(transform_node)
- step.add_property(
- PropertyNames.OUTPUT_INFO,
- [{
- PropertyNames.USER_NAME: (
- '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
- PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT
- }])
-
- def run_Flatten(self, transform_node, options):
- step = self._add_step(
- TransformNames.FLATTEN, transform_node.full_label, transform_node)
- inputs = []
- for one_input in transform_node.inputs:
- input_step = self._cache.get_pvalue(one_input)
- inputs.append({
- '@type': 'OutputReference',
- PropertyNames.STEP_NAME: input_step.proto.name,
- PropertyNames.OUTPUT_NAME: input_step.get_output(one_input.tag)
- })
- step.add_property(PropertyNames.INPUTS, inputs)
- step.encoding = self._get_encoded_output_coder(transform_node)
- step.add_property(
- PropertyNames.OUTPUT_INFO,
- [{
- PropertyNames.USER_NAME: (
- '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
- PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT
- }])
-
# TODO(srohde): Remove this after internal usages have been removed.
def apply_GroupByKey(self, transform, pcoll, options):
return transform.expand(pcoll)
@@ -784,512 +510,6 @@ class DataflowRunner(PipelineRunner):
coders.registry.verify_deterministic(
coder.key_coder(), 'GroupByKey operation "%s"' % transform.label)
- def run_GroupByKey(self, transform_node, options):
- input_tag = transform_node.inputs[0].tag
- input_step = self._cache.get_pvalue(transform_node.inputs[0])
-
- # Verify that the GBK's parent has a KV coder.
- self._verify_gbk_coders(transform_node.transform, transform_node.inputs[0])
-
- step = self._add_step(
- TransformNames.GROUP, transform_node.full_label, transform_node)
- step.add_property(
- PropertyNames.PARALLEL_INPUT,
- {
- '@type': 'OutputReference',
- PropertyNames.STEP_NAME: input_step.proto.name,
- PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)
- })
- step.encoding = self._get_encoded_output_coder(transform_node)
- step.add_property(
- PropertyNames.OUTPUT_INFO,
- [{
- PropertyNames.USER_NAME: (
- '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
- PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT
- }])
- windowing = transform_node.transform.get_windowing(transform_node.inputs)
- step.add_property(
- PropertyNames.SERIALIZED_FN,
- self.serialize_windowing_strategy(windowing,
self._default_environment))
-
- def run_ExternalTransform(self, transform_node, options):
- # Adds a dummy step to the Dataflow job description so that inputs and
- # outputs are mapped correctly in the presence of external transforms.
- #
- # Note that Dataflow Python multi-language pipelines use Portable Job
- # Submission by default, hence this step and rest of the Dataflow step
- # definitions defined here are not used at Dataflow service but we have to
- # maintain the mapping correctly till we can fully drop the Dataflow step
- # definitions from the SDK.
-
- # AppliedTransform node outputs have to be updated to correctly map the
- # outputs for external transforms.
- transform_node.outputs = ({
- output.tag: output
- for output in transform_node.outputs.values()
- })
-
- self.run_Impulse(transform_node, options)
-
- def run_ParDo(self, transform_node, options):
- transform = transform_node.transform
- input_tag = transform_node.inputs[0].tag
- input_step = self._cache.get_pvalue(transform_node.inputs[0])
-
- # Attach side inputs.
- si_dict = {}
- si_labels = {}
- full_label_counts = defaultdict(int)
- lookup_label = lambda side_pval: si_labels[side_pval]
- named_inputs = transform_node.named_inputs()
- label_renames = {}
- for ix, side_pval in enumerate(transform_node.side_inputs):
- assert isinstance(side_pval, AsSideInput)
- step_name = 'SideInput-' + self._get_unique_step_name()
- si_label = ((SIDE_INPUT_PREFIX + '%d-%s') %
- (ix, transform_node.full_label))
- old_label = (SIDE_INPUT_PREFIX + '%d') % ix
-
- label_renames[old_label] = si_label
-
- assert old_label in named_inputs
- pcollection_label = '%s.%s' % (
- side_pval.pvalue.producer.full_label.split('/')[-1],
- side_pval.pvalue.tag if side_pval.pvalue.tag else 'out')
- si_full_label = '%s/%s(%s.%s)' % (
- transform_node.full_label,
- side_pval.__class__.__name__,
- pcollection_label,
- full_label_counts[pcollection_label])
-
- # Count the number of times the same PCollection is a side input
- # to the same ParDo.
- full_label_counts[pcollection_label] += 1
-
- self._add_singleton_step(
- step_name,
- si_full_label,
- side_pval.pvalue.tag,
- self._cache.get_pvalue(side_pval.pvalue),
- side_pval.pvalue.windowing,
- side_pval._side_input_data().access_pattern)
- si_dict[si_label] = {
- '@type': 'OutputReference',
- PropertyNames.STEP_NAME: step_name,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT
- }
- si_labels[side_pval] = si_label
-
- # Now create the step for the ParDo transform being handled.
- transform_name = transform_node.full_label.rsplit('/', 1)[-1]
- step = self._add_step(
- TransformNames.DO,
- transform_node.full_label +
- ('/{}'.format(transform_name) if transform_node.side_inputs else ''),
- transform_node,
- transform_node.transform.output_tags)
- transform_proto = self.proto_context.transforms.get_proto(transform_node)
- transform_id = self.proto_context.transforms.get_id(transform_node)
- is_runner_v2 = _is_runner_v2(options)
- # Patch side input ids to be unique across a given pipeline.
- if (label_renames and
- transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
- # Patch PTransform proto.
- for old, new in label_renames.items():
- transform_proto.inputs[new] = transform_proto.inputs[old]
- del transform_proto.inputs[old]
-
- # Patch ParDo proto.
- proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
- proto = proto_utils.parse_Bytes(transform_proto.spec.payload, proto_type)
- for old, new in label_renames.items():
- proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
- del proto.side_inputs[old]
- transform_proto.spec.payload = proto.SerializeToString()
- # We need to update the pipeline proto.
- del self.proto_pipeline.components.transforms[transform_id]
- (
- self.proto_pipeline.components.transforms[transform_id].CopyFrom(
- transform_proto))
- # The data transmitted in SERIALIZED_FN is different depending on whether
- # this is a runner v2 pipeline or not.
- if is_runner_v2:
- serialized_data = transform_id
- else:
- serialized_data = pickler.dumps(
- self._pardo_fn_data(transform_node, lookup_label))
- step.add_property(PropertyNames.SERIALIZED_FN, serialized_data)
- # TODO(BEAM-8882): Enable once dataflow service doesn't reject this.
- # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID,
transform_id)
- step.add_property(
- PropertyNames.PARALLEL_INPUT,
- {
- '@type': 'OutputReference',
- PropertyNames.STEP_NAME: input_step.proto.name,
- PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)
- })
- # Add side inputs if any.
- step.add_property(PropertyNames.NON_PARALLEL_INPUTS, si_dict)
-
- # Generate description for the outputs. The output names
- # will be 'None' for main output and '<tag>' for a tagged output.
- outputs = []
-
- all_output_tags = list(transform_proto.outputs.keys())
-
- # Some external transforms require output tags to not be modified.
- # So we randomly select one of the output tags as the main output and
- # leave others as side outputs. Transform execution should not change
- # dependending on which output tag we choose as the main output here.
- # Also, some SDKs do not work correctly if output tags are modified. So for
- # external transforms, we leave tags unmodified.
- #
- # Python SDK uses 'None' as the tag of the main output.
- main_output_tag = 'None'
-
- step.encoding = self._get_encoded_output_coder(
- transform_node, output_tag=main_output_tag)
-
- side_output_tags = set(all_output_tags).difference({main_output_tag})
-
- # Add the main output to the description.
- outputs.append({
- PropertyNames.USER_NAME: (
- '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
- PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: main_output_tag
- })
- for side_tag in side_output_tags:
- # The assumption here is that all outputs will have the same typehint
- # and coder as the main output. This is certainly the case right now
- # but conceivably it could change in the future.
- encoding = self._get_encoded_output_coder(
- transform_node, output_tag=side_tag)
- outputs.append({
- PropertyNames.USER_NAME: (
- '%s.%s' % (transform_node.full_label, side_tag)),
- PropertyNames.ENCODING: encoding,
- PropertyNames.OUTPUT_NAME: side_tag
- })
-
- step.add_property(PropertyNames.OUTPUT_INFO, outputs)
-
- # Add the restriction encoding if we are a splittable DoFn
- restriction_coder = transform.get_restriction_coder()
- if restriction_coder:
- step.add_property(
- PropertyNames.RESTRICTION_ENCODING,
- self._get_cloud_encoding(restriction_coder))
-
- if options.view_as(StandardOptions).streaming:
- is_stateful_dofn = (DoFnSignature(transform.dofn).is_stateful_dofn())
- if is_stateful_dofn:
- step.add_property(PropertyNames.USES_KEYED_STATE, 'true')
-
- # Also checks whether the step allows shardable keyed states.
- # TODO(BEAM-11360): remove this when migrated to portable job
- # submission since we only consider supporting the property in runner
- # v2.
- for pcoll in transform_node.outputs.values():
- if pcoll._unique_name() in self.get_pcoll_with_auto_sharding():
- step.add_property(PropertyNames.ALLOWS_SHARDABLE_STATE, 'true')
- # Currently we only allow auto-sharding to be enabled through the
- # GroupIntoBatches transform. So we also add the following property
- # which GroupIntoBatchesDoFn has, to allow the backend to perform
- # graph optimization.
- step.add_property(PropertyNames.PRESERVES_KEYS, 'true')
- break
-
- @staticmethod
- def _pardo_fn_data(transform_node, get_label):
- transform = transform_node.transform
- si_tags_and_types = [ # pylint: disable=protected-access
- (get_label(side_pval), side_pval.__class__, side_pval._view_options())
- for side_pval in transform_node.side_inputs]
- return (
- transform.fn,
- transform.args,
- transform.kwargs,
- si_tags_and_types,
- transform_node.inputs[0].windowing)
-
- def run_CombineValuesReplacement(self, transform_node, options):
- transform = transform_node.transform.transform
- input_tag = transform_node.inputs[0].tag
- input_step = self._cache.get_pvalue(transform_node.inputs[0])
- step = self._add_step(
- TransformNames.COMBINE, transform_node.full_label, transform_node)
- transform_id = self.proto_context.transforms.get_id(transform_node.parent)
-
- # The data transmitted in SERIALIZED_FN is different depending on whether
- # this is a runner v2 pipeline or not.
- if _is_runner_v2(options):
- # Fnapi pipelines send the transform ID of the CombineValues transform's
- # parent composite because Dataflow expects the ID of a CombinePerKey
- # transform.
- serialized_data = transform_id
- else:
- # Combiner functions do not take deferred side-inputs (i.e. PValues) and
- # therefore the code to handle extra args/kwargs is simpler than for the
- # DoFn's of the ParDo transform. In the last, empty argument is where
- # side inputs information would go.
- serialized_data = pickler.dumps(
- (transform.fn, transform.args, transform.kwargs, ()))
- step.add_property(PropertyNames.SERIALIZED_FN, serialized_data)
- # TODO(BEAM-8882): Enable once dataflow service doesn't reject this.
- # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID,
transform_id)
- step.add_property(
- PropertyNames.PARALLEL_INPUT,
- {
- '@type': 'OutputReference',
- PropertyNames.STEP_NAME: input_step.proto.name,
- PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)
- })
- # Note that the accumulator must not have a WindowedValue encoding, while
- # the output of this step does in fact have a WindowedValue encoding.
- accumulator_encoding = self._get_cloud_encoding(
- transform.fn.get_accumulator_coder())
- output_encoding = self._get_encoded_output_coder(transform_node)
-
- step.encoding = output_encoding
- step.add_property(PropertyNames.ENCODING, accumulator_encoding)
- # Generate description for main output 'out.'
- outputs = []
- # Add the main output to the description.
- outputs.append({
- PropertyNames.USER_NAME: (
- '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
- PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT
- })
- step.add_property(PropertyNames.OUTPUT_INFO, outputs)
-
- def run_Read(self, transform_node, options):
- transform = transform_node.transform
- step = self._add_step(
- TransformNames.READ, transform_node.full_label, transform_node)
- # TODO(mairbek): refactor if-else tree to use registerable functions.
- # Initialize the source specific properties.
-
- standard_options = options.view_as(StandardOptions)
- if not hasattr(transform.source, 'format'):
- # If a format is not set, we assume the source to be a custom source.
- source_dict = {}
-
- source_dict['spec'] = {
- '@type': names.SOURCE_TYPE,
- names.SERIALIZED_SOURCE_KEY: pickler.dumps(transform.source)
- }
-
- try:
- source_dict['metadata'] = {
- 'estimated_size_bytes': json_value.get_typed_value_descriptor(
- transform.source.estimate_size())
- }
- except error.RuntimeValueProviderError:
- # Size estimation is best effort, and this error is by value provider.
- _LOGGER.info(
- 'Could not estimate size of source %r due to ' + \
- 'RuntimeValueProviderError', transform.source)
- except Exception: # pylint: disable=broad-except
- # Size estimation is best effort. So we log the error and continue.
- _LOGGER.info(
- 'Could not estimate size of source %r due to an exception: %s',
- transform.source,
- traceback.format_exc())
-
- step.add_property(PropertyNames.SOURCE_STEP_INPUT, source_dict)
- elif transform.source.format == 'pubsub':
- if not standard_options.streaming:
- raise ValueError(
- 'Cloud Pub/Sub is currently available for use '
- 'only in streaming pipelines.')
- # Only one of topic or subscription should be set.
- if transform.source.full_subscription:
- step.add_property(
- PropertyNames.PUBSUB_SUBSCRIPTION,
- transform.source.full_subscription)
- elif transform.source.full_topic:
- step.add_property(
- PropertyNames.PUBSUB_TOPIC, transform.source.full_topic)
- if transform.source.id_label:
- step.add_property(
- PropertyNames.PUBSUB_ID_LABEL, transform.source.id_label)
- if transform.source.with_attributes:
- # Setting this property signals Dataflow runner to return full
- # PubsubMessages instead of just the data part of the payload.
- step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
-
- if transform.source.timestamp_attribute is not None:
- step.add_property(
- PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
- transform.source.timestamp_attribute)
- else:
- raise ValueError(
- 'Source %r has unexpected format %s.' %
- (transform.source, transform.source.format))
-
- if not hasattr(transform.source, 'format'):
- step.add_property(PropertyNames.FORMAT, names.SOURCE_FORMAT)
- else:
- step.add_property(PropertyNames.FORMAT, transform.source.format)
-
- # Wrap coder in WindowedValueCoder: this is necessary as the encoding of a
- # step should be the type of value outputted by each step. Read steps
- # automatically wrap output values in a WindowedValue wrapper, if
necessary.
- # This is also necessary for proper encoding for size estimation.
- # Using a GlobalWindowCoder as a place holder instead of the default
- # PickleCoder because GlobalWindowCoder is known coder.
- # TODO(robertwb): Query the collection for the windowfn to extract the
- # correct coder.
- coder = coders.WindowedValueCoder(
- coders.registry.get_coder(transform_node.outputs[None].element_type),
- coders.coders.GlobalWindowCoder())
-
- step.encoding = self._get_cloud_encoding(coder)
- step.add_property(
- PropertyNames.OUTPUT_INFO,
- [{
- PropertyNames.USER_NAME: (
- '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
- PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT
- }])
-
- def run__NativeWrite(self, transform_node, options):
- transform = transform_node.transform
- input_tag = transform_node.inputs[0].tag
- input_step = self._cache.get_pvalue(transform_node.inputs[0])
- step = self._add_step(
- TransformNames.WRITE, transform_node.full_label, transform_node)
- # TODO(mairbek): refactor if-else tree to use registerable functions.
- # Initialize the sink specific properties.
- if transform.sink.format == 'pubsub':
- standard_options = options.view_as(StandardOptions)
- if not standard_options.streaming:
- raise ValueError(
- 'Cloud Pub/Sub is currently available for use '
- 'only in streaming pipelines.')
- step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.full_topic)
- if transform.sink.id_label:
- step.add_property(
- PropertyNames.PUBSUB_ID_LABEL, transform.sink.id_label)
- # Setting this property signals Dataflow runner that the PCollection
- # contains PubsubMessage objects instead of just raw data.
- step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
- if transform.sink.timestamp_attribute is not None:
- step.add_property(
- PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
- transform.sink.timestamp_attribute)
- else:
- raise ValueError(
- 'Sink %r has unexpected format %s.' %
- (transform.sink, transform.sink.format))
- step.add_property(PropertyNames.FORMAT, transform.sink.format)
-
- # Wrap coder in WindowedValueCoder: this is necessary for proper encoding
- # for size estimation. Using a GlobalWindowCoder as a place holder instead
- # of the default PickleCoder because GlobalWindowCoder is known coder.
- # TODO(robertwb): Query the collection for the windowfn to extract the
- # correct coder.
- coder = coders.WindowedValueCoder(
- transform.sink.coder, coders.coders.GlobalWindowCoder())
- step.encoding = self._get_cloud_encoding(coder)
- step.add_property(PropertyNames.ENCODING, step.encoding)
- step.add_property(
- PropertyNames.PARALLEL_INPUT,
- {
- '@type': 'OutputReference',
- PropertyNames.STEP_NAME: input_step.proto.name,
- PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)
- })
-
- def run_TestStream(self, transform_node, options):
- from apache_beam.testing.test_stream import ElementEvent
- from apache_beam.testing.test_stream import ProcessingTimeEvent
- from apache_beam.testing.test_stream import WatermarkEvent
- standard_options = options.view_as(StandardOptions)
- if not standard_options.streaming:
- raise ValueError(
- 'TestStream is currently available for use '
- 'only in streaming pipelines.')
-
- transform = transform_node.transform
- step = self._add_step(
- TransformNames.READ, transform_node.full_label, transform_node)
- step.add_property(
- PropertyNames.SERIALIZED_FN,
- self.proto_context.transforms.get_id(transform_node))
- step.add_property(PropertyNames.FORMAT, 'test_stream')
- test_stream_payload = beam_runner_api_pb2.TestStreamPayload()
- # TestStream source doesn't do any decoding of elements,
- # so we won't set test_stream_payload.coder_id.
- output_coder = transform._infer_output_coder() # pylint:
disable=protected-access
- for event in transform._events:
- new_event = test_stream_payload.events.add()
- if isinstance(event, ElementEvent):
- for tv in event.timestamped_values:
- element = new_event.element_event.elements.add()
- element.encoded_element = output_coder.encode(tv.value)
- element.timestamp = tv.timestamp.micros
- elif isinstance(event, ProcessingTimeEvent):
- new_event.processing_time_event.advance_duration = (
- event.advance_by.micros)
- elif isinstance(event, WatermarkEvent):
- new_event.watermark_event.new_watermark = event.new_watermark.micros
- serialized_payload = self.byte_array_to_json_string(
- test_stream_payload.SerializeToString())
- step.add_property(PropertyNames.SERIALIZED_TEST_STREAM, serialized_payload)
-
- step.encoding = self._get_encoded_output_coder(transform_node)
- step.add_property(
- PropertyNames.OUTPUT_INFO,
- [{
- PropertyNames.USER_NAME: (
- '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
- PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT
- }])
-
- # We must mark this method as not a test or else its name is a matcher for
- # nosetest tests.
- run_TestStream.__test__ = False # type: ignore[attr-defined]
-
- @classmethod
- def serialize_windowing_strategy(cls, windowing, default_environment):
- from apache_beam.runners import pipeline_context
- context = pipeline_context.PipelineContext(
- default_environment=default_environment)
- windowing_proto = windowing.to_runner_api(context)
- return cls.byte_array_to_json_string(
- beam_runner_api_pb2.MessageWithComponents(
- components=context.to_runner_api(),
- windowing_strategy=windowing_proto).SerializeToString())
-
- @classmethod
- def deserialize_windowing_strategy(cls, serialized_data):
- # Imported here to avoid circular dependencies.
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.runners import pipeline_context
- from apache_beam.transforms.core import Windowing
- proto = beam_runner_api_pb2.MessageWithComponents()
- proto.ParseFromString(cls.json_string_to_byte_array(serialized_data))
- return Windowing.from_runner_api(
- proto.windowing_strategy,
- pipeline_context.PipelineContext(proto.components))
-
- @staticmethod
- def byte_array_to_json_string(raw_bytes):
- """Implements
org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString."""
- return quote(raw_bytes)
-
- @staticmethod
- def json_string_to_byte_array(encoded_string):
- """Implements
org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray."""
- return unquote_to_bytes(encoded_string)
-
def get_default_gcp_region(self):
"""Get a default value for Google Cloud region according to
https://cloud.google.com/compute/docs/gcloud-compute/#default-properties.
@@ -1348,6 +568,8 @@ def _check_and_add_missing_options(options):
options.view_as(
GoogleCloudOptions).dataflow_service_options = dataflow_service_options
+ _add_runner_v2_missing_options(options)
+
# Ensure that prime is specified as an experiment if specified as a dataflow
# service option
if 'enable_prime' in dataflow_service_options:
@@ -1359,11 +581,6 @@ def _check_and_add_missing_options(options):
# Runner v2 only supports using streaming engine (aka windmill service)
if options.view_as(StandardOptions).streaming:
google_cloud_options = options.view_as(GoogleCloudOptions)
- if _is_runner_v2_disabled(options):
- raise ValueError(
- 'Disabling Runner V2 no longer supported for streaming pipeline '
- 'using Beam Python %s.' % beam.version.__version__)
-
if (not google_cloud_options.enable_streaming_engine and
(debug_options.lookup_experiment("enable_windmill_service") or
debug_options.lookup_experiment("enable_streaming_engine"))):
@@ -1380,29 +597,6 @@ def _check_and_add_missing_options(options):
google_cloud_options.enable_streaming_engine = True
debug_options.add_experiment("enable_streaming_engine")
debug_options.add_experiment("enable_windmill_service")
- _add_runner_v2_missing_options(debug_options)
- elif (debug_options.lookup_experiment('enable_prime') or
- debug_options.lookup_experiment('beam_fn_api') or
- debug_options.lookup_experiment('use_unified_worker') or
- debug_options.lookup_experiment('use_runner_v2') or
- debug_options.lookup_experiment('use_portable_job_submission')):
- if _is_runner_v2_disabled(options):
- raise ValueError(
- """Runner V2 both disabled and enabled: at least one of
- ['enable_prime', 'beam_fn_api', 'use_unified_worker',
'use_runner_v2',
- 'use_portable_job_submission'] is set and also one of
- ['disable_runner_v2', 'disable_runner_v2_until_2023',
- 'disable_prime_runner_v2'] is set.""")
- _add_runner_v2_missing_options(debug_options)
-
-
-def _is_runner_v2(options):
- # Type: (PipelineOptions) -> bool
-
- """Returns true if runner v2 is enabled."""
- _check_and_add_missing_options(options)
- return options.view_as(DebugOptions).lookup_experiment(
- 'use_runner_v2', default=False)
def _is_runner_v2_disabled(options):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index b00644d13fd..1e084b98278 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -19,19 +19,13 @@
# pytype: skip-file
-import json
import unittest
-from datetime import datetime
-from itertools import product
import mock
-from parameterized import param
-from parameterized import parameterized
import apache_beam as beam
import apache_beam.transforms as ptransform
from apache_beam.options.pipeline_options import DebugOptions
-from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import AppliedPTransform
from apache_beam.pipeline import Pipeline
@@ -45,18 +39,13 @@ from apache_beam.runners import common
from apache_beam.runners import create_runner
from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
from apache_beam.runners.dataflow.dataflow_runner import
DataflowRuntimeException
-from apache_beam.runners.dataflow.dataflow_runner import PropertyNames
-from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2
-from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled
+from apache_beam.runners.dataflow.dataflow_runner import
_check_and_add_missing_options
from apache_beam.runners.dataflow.internal.clients import dataflow as
dataflow_api
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.extra_assertions import ExtraAssertionsMixin
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms import combiners
from apache_beam.transforms import environments
-from apache_beam.transforms import window
-from apache_beam.transforms.core import Windowing
-from apache_beam.transforms.display import DisplayDataItem
from apache_beam.typehints import typehints
# Protect against environments where apitools library is not available.
@@ -262,49 +251,6 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())
- def test_remote_runner_display_data(self):
- remote_runner = DataflowRunner()
- p = Pipeline(
- remote_runner, options=PipelineOptions(self.default_properties))
-
- now = datetime.now()
- # pylint: disable=expression-not-assigned
- (
- p | ptransform.Create([1, 2, 3, 4, 5])
- | 'Do' >> SpecialParDo(SpecialDoFn(), now))
-
- # TODO(https://github.com/apache/beam/issues/18012) Enable runner API on
- # this test.
- p.run(test_runner_api=False)
- job_dict = json.loads(str(remote_runner.job))
- steps = [
- step for step in job_dict['steps']
- if len(step['properties'].get('display_data', [])) > 0
- ]
- step = steps[1]
- disp_data = step['properties']['display_data']
- nspace = SpecialParDo.__module__ + '.'
- expected_data = [{
- 'type': 'TIMESTAMP',
- 'namespace': nspace + 'SpecialParDo',
- 'value': DisplayDataItem._format_value(now, 'TIMESTAMP'),
- 'key': 'a_time'
- },
- {
- 'type': 'STRING',
- 'namespace': nspace + 'SpecialParDo',
- 'value': nspace + 'SpecialParDo',
- 'key': 'a_class',
- 'shortValue': 'SpecialParDo'
- },
- {
- 'type': 'INTEGER',
- 'namespace': nspace + 'SpecialDoFn',
- 'value': 42,
- 'key': 'dofn_value'
- }]
- self.assertUnhashableCountEqual(disp_data, expected_data)
-
def test_group_by_key_input_visitor_with_valid_inputs(self):
p = TestPipeline()
pcoll1 = PCollection(p)
@@ -391,15 +337,6 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
self.assertEqual(flat.element_type, none_str_pc.element_type)
self.assertEqual(flat.element_type, none_int_pc.element_type)
- def test_serialize_windowing_strategy(self):
- # This just tests the basic path; more complete tests
- # are in window_test.py.
- strategy = Windowing(window.FixedWindows(10))
- self.assertEqual(
- strategy,
- DataflowRunner.deserialize_windowing_strategy(
- DataflowRunner.serialize_windowing_strategy(strategy, None)))
-
def test_side_input_visitor(self):
p = TestPipeline()
pc = p | beam.Create([])
@@ -411,8 +348,7 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
beam.pvalue.AsSingleton(pc),
beam.pvalue.AsMultiMap(pc))
applied_transform = AppliedPTransform(None, transform, "label", {'pc': pc})
- DataflowRunner.side_input_visitor(
- is_runner_v2=True).visit_transform(applied_transform)
+ DataflowRunner.side_input_visitor().visit_transform(applied_transform)
self.assertEqual(2, len(applied_transform.side_inputs))
self.assertEqual(
common_urns.side_inputs.ITERABLE.urn,
@@ -504,116 +440,6 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
result = runner.get_default_gcp_region()
self.assertIsNone(result)
- def test_combine_values_translation(self):
- runner = DataflowRunner()
-
- with beam.Pipeline(runner=runner,
- options=PipelineOptions(self.default_properties)) as p:
- ( # pylint: disable=expression-not-assigned
- p
- | beam.Create([('a', [1, 2]), ('b', [3, 4])])
- | beam.CombineValues(lambda v, _: sum(v)))
-
- job_dict = json.loads(str(runner.job))
- self.assertIn(
- 'CombineValues', set(step['kind'] for step in job_dict['steps']))
-
- def _find_step(self, job, step_name):
- job_dict = json.loads(str(job))
- maybe_step = [
- s for s in job_dict['steps']
- if s['properties']['user_name'] == step_name
- ]
- self.assertTrue(maybe_step, 'Could not find step {}'.format(step_name))
- return maybe_step[0]
-
- def expect_correct_override(self, job, step_name, step_kind):
- """Expects that a transform was correctly overriden."""
-
- # If the typing information isn't being forwarded correctly, the component
- # encodings here will be incorrect.
- expected_output_info = [{
- "encoding": {
- "@type": "kind:windowed_value",
- "component_encodings": [{
- "@type": "kind:bytes"
- }, {
- "@type": "kind:global_window"
- }],
- "is_wrapper": True
- },
- "output_name": "out",
- "user_name": step_name + ".out"
- }]
-
- step = self._find_step(job, step_name)
- self.assertEqual(step['kind'], step_kind)
-
- # The display data here is forwarded because the replace transform is
- # subclassed from iobase.Read.
- self.assertGreater(len(step['properties']['display_data']), 0)
- self.assertEqual(step['properties']['output_info'], expected_output_info)
-
- def test_read_create_translation(self):
- runner = DataflowRunner()
-
- with beam.Pipeline(runner=runner,
- options=PipelineOptions(self.default_properties)) as p:
- # pylint: disable=expression-not-assigned
- p | beam.Create([b'a', b'b', b'c'])
-
- self.expect_correct_override(runner.job, 'Create/Read', 'ParallelRead')
-
- def test_read_pubsub_translation(self):
- runner = DataflowRunner()
-
- self.default_properties.append("--streaming")
-
- with beam.Pipeline(runner=runner,
- options=PipelineOptions(self.default_properties)) as p:
- # pylint: disable=expression-not-assigned
- p | beam.io.ReadFromPubSub(topic='projects/project/topics/topic')
-
- self.expect_correct_override(
- runner.job, 'ReadFromPubSub/Read', 'ParallelRead')
-
- def test_gbk_translation(self):
- runner = DataflowRunner()
- with beam.Pipeline(runner=runner,
- options=PipelineOptions(self.default_properties)) as p:
- # pylint: disable=expression-not-assigned
- p | beam.Create([(1, 2)]) | beam.GroupByKey()
-
- expected_output_info = [{
- "encoding": {
- "@type": "kind:windowed_value",
- "component_encodings": [{
- "@type": "kind:pair",
- "component_encodings": [{
- "@type": "kind:varint"
- },
- {
- "@type": "kind:stream",
- "component_encodings": [{
- "@type": "kind:varint"
- }],
- "is_stream_like": True
- }],
- "is_pair_like": True
- }, {
- "@type": "kind:global_window"
- }],
- "is_wrapper": True
- },
- "output_name": "out",
- "user_name": "GroupByKey.out"
- }] # yapf: disable
-
- gbk_step = self._find_step(runner.job, 'GroupByKey')
- self.assertEqual(gbk_step['kind'], 'GroupByKey')
- self.assertEqual(
- gbk_step['properties']['output_info'], expected_output_info)
-
@unittest.skip(
'https://github.com/apache/beam/issues/18716: enable once '
'CombineFnVisitor is fixed')
@@ -646,43 +472,6 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
except ValueError:
self.fail('ValueError raised unexpectedly')
- def _run_group_into_batches_and_get_step_properties(
- self, with_sharded_key, additional_properties):
- self.default_properties.append('--streaming')
- for property in additional_properties:
- self.default_properties.append(property)
-
- runner = DataflowRunner()
- with beam.Pipeline(runner=runner,
- options=PipelineOptions(self.default_properties)) as p:
- # pylint: disable=expression-not-assigned
- input = p | beam.Create([('a', 1), ('a', 1), ('b', 3), ('b', 4)])
- if with_sharded_key:
- (
- input | beam.GroupIntoBatches.WithShardedKey(2)
- | beam.Map(lambda key_values: (key_values[0].key, key_values[1])))
- step_name = (
- 'WithShardedKey/GroupIntoBatches/ParDo(_GroupIntoBatchesDoFn)')
- else:
- input | beam.GroupIntoBatches(2)
- step_name = 'GroupIntoBatches/ParDo(_GroupIntoBatchesDoFn)'
-
- return self._find_step(runner.job, step_name)['properties']
-
- def test_group_into_batches_translation(self):
- properties = self._run_group_into_batches_and_get_step_properties(
- True, ['--enable_streaming_engine', '--experiments=use_runner_v2'])
- self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], 'true')
- self.assertEqual(properties[PropertyNames.ALLOWS_SHARDABLE_STATE], 'true')
- self.assertEqual(properties[PropertyNames.PRESERVES_KEYS], 'true')
-
- def test_group_into_batches_translation_non_sharded(self):
- properties = self._run_group_into_batches_and_get_step_properties(
- False, ['--enable_streaming_engine', '--experiments=use_runner_v2'])
- self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], 'true')
- self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
- self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)
-
def test_pack_combiners(self):
class PackableCombines(beam.PTransform):
def annotations(self):
@@ -711,141 +500,44 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
self.assertNotIn(unpacked_maximum_step_name, transform_names)
self.assertIn(packed_step_name, transform_names)
- @parameterized.expand([
- param(memory_hint='min_ram'),
- param(memory_hint='minRam'),
- ])
- def test_resource_hints_translation(self, memory_hint):
- runner = DataflowRunner()
- self.default_properties.append('--resource_hint=accelerator=some_gpu')
- self.default_properties.append(f'--resource_hint={memory_hint}=20GB')
- with beam.Pipeline(runner=runner,
- options=PipelineOptions(self.default_properties)) as p:
- # pylint: disable=expression-not-assigned
- (
- p
- | beam.Create([1])
- | 'MapWithHints' >> beam.Map(lambda x: x + 1).with_resource_hints(
- min_ram='10GB',
-
accelerator='type:nvidia-tesla-k80;count:1;install-nvidia-drivers'
- ))
-
- step = self._find_step(runner.job, 'MapWithHints')
- self.assertEqual(
- step['properties']['resource_hints'],
- {
- 'beam:resources:min_ram_bytes:v1': '20000000000',
- 'beam:resources:accelerator:v1': \
- 'type%3Anvidia-tesla-k80%3Bcount%3A1%3Binstall-nvidia-drivers'
- })
-
- @parameterized.expand([
- (
- "%s_%s" % (enable_option, disable_option),
- enable_option,
- disable_option)
- for (enable_option,
- disable_option) in product([
- False,
- 'enable_prime',
- 'beam_fn_api',
- 'use_unified_worker',
- 'use_runner_v2',
- 'use_portable_job_submission'
- ],
- [
- False,
- 'disable_runner_v2',
- 'disable_runner_v2_until_2023',
- 'disable_prime_runner_v2'
- ])
- ])
- def test_batch_is_runner_v2(self, name, enable_option, disable_option):
- options = PipelineOptions(
- (['--experiments=%s' % enable_option] if enable_option else []) +
- (['--experiments=%s' % disable_option] if disable_option else []))
- if (enable_option and disable_option):
- with self.assertRaisesRegex(ValueError,
- 'Runner V2 both disabled and enabled'):
- _is_runner_v2(options)
- elif enable_option:
- self.assertTrue(_is_runner_v2(options))
- self.assertFalse(_is_runner_v2_disabled(options))
- for expected in ['beam_fn_api',
- 'use_unified_worker',
- 'use_runner_v2',
- 'use_portable_job_submission']:
- self.assertTrue(
- options.view_as(DebugOptions).lookup_experiment(expected, False))
- if enable_option == 'enable_prime':
- self.assertIn(
- 'enable_prime',
- options.view_as(GoogleCloudOptions).dataflow_service_options)
- elif disable_option:
- self.assertFalse(_is_runner_v2(options))
- self.assertTrue(_is_runner_v2_disabled(options))
- else:
- self.assertFalse(_is_runner_v2(options))
-
- @parameterized.expand([
- (
- "%s_%s" % (enable_option, disable_option),
- enable_option,
- disable_option)
- for (enable_option,
- disable_option) in product([
- False,
- 'enable_prime',
- 'beam_fn_api',
- 'use_unified_worker',
- 'use_runner_v2',
- 'use_portable_job_submission'
- ],
- [
- False,
- 'disable_runner_v2',
- 'disable_runner_v2_until_2023',
- 'disable_prime_runner_v2'
- ])
- ])
- def test_streaming_is_runner_v2(self, name, enable_option, disable_option):
- options = PipelineOptions(
- ['--streaming'] +
- (['--experiments=%s' % enable_option] if enable_option else []) +
- (['--experiments=%s' % disable_option] if disable_option else []))
- if disable_option:
- with self.assertRaisesRegex(
- ValueError,
- 'Disabling Runner V2 no longer supported for streaming pipeline'):
- _is_runner_v2(options)
- else:
- self.assertTrue(_is_runner_v2(options))
- for expected in ['beam_fn_api',
- 'use_unified_worker',
- 'use_runner_v2',
- 'use_portable_job_submission',
- 'enable_windmill_service',
- 'enable_streaming_engine']:
- self.assertTrue(
- options.view_as(DebugOptions).lookup_experiment(expected, False))
- if enable_option == 'enable_prime':
- self.assertIn(
- 'enable_prime',
- options.view_as(GoogleCloudOptions).dataflow_service_options)
+ def test_batch_is_runner_v2(self):
+ options = PipelineOptions()
+ _check_and_add_missing_options(options)
+ for expected in ['beam_fn_api',
+ 'use_unified_worker',
+ 'use_runner_v2',
+ 'use_portable_job_submission']:
+ self.assertTrue(
+ options.view_as(DebugOptions).lookup_experiment(expected, False),
+ expected)
+
+ def test_streaming_is_runner_v2(self):
+ options = PipelineOptions(['--streaming'])
+ _check_and_add_missing_options(options)
+ for expected in ['beam_fn_api',
+ 'use_unified_worker',
+ 'use_runner_v2',
+ 'use_portable_job_submission',
+ 'enable_windmill_service',
+ 'enable_streaming_engine']:
+ self.assertTrue(
+ options.view_as(DebugOptions).lookup_experiment(expected, False),
+ expected)
def test_dataflow_service_options_enable_prime_sets_runner_v2(self):
options = PipelineOptions(['--dataflow_service_options=enable_prime'])
- self.assertTrue(_is_runner_v2(options))
+ _check_and_add_missing_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
'use_runner_v2',
'use_portable_job_submission']:
self.assertTrue(
- options.view_as(DebugOptions).lookup_experiment(expected, False))
+ options.view_as(DebugOptions).lookup_experiment(expected, False),
+ expected)
options = PipelineOptions(
['--streaming', '--dataflow_service_options=enable_prime'])
- self.assertTrue(_is_runner_v2(options))
+ _check_and_add_missing_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
'use_runner_v2',
@@ -853,7 +545,8 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
'enable_windmill_service',
'enable_streaming_engine']:
self.assertTrue(
- options.view_as(DebugOptions).lookup_experiment(expected, False))
+ options.view_as(DebugOptions).lookup_experiment(expected, False),
+ expected)
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index bffcc6d6634..ff1beeab510 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -65,7 +65,6 @@ from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.common import validate_pipeline_graph
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow
-from apache_beam.runners.dataflow.internal.names import PropertyNames
from apache_beam.runners.internal import names as shared_names
from apache_beam.runners.portability.stager import Stager
from apache_beam.transforms import DataflowDistributionCounter
@@ -86,63 +85,6 @@ _LOGGER = logging.getLogger(__name__)
_PYTHON_VERSIONS_SUPPORTED_BY_DATAFLOW = ['3.8', '3.9', '3.10', '3.11']
-class Step(object):
- """Wrapper for a dataflow Step protobuf."""
- def __init__(self, step_kind, step_name, additional_properties=None):
- self.step_kind = step_kind
- self.step_name = step_name
- self.proto = dataflow.Step(kind=step_kind, name=step_name)
- self.proto.properties = {}
- self._additional_properties = []
-
- if additional_properties is not None:
- for (n, v, t) in additional_properties:
- self.add_property(n, v, t)
-
- def add_property(self, name, value, with_type=False):
- self._additional_properties.append((name, value, with_type))
- self.proto.properties.additionalProperties.append(
- dataflow.Step.PropertiesValue.AdditionalProperty(
- key=name, value=to_json_value(value, with_type=with_type)))
-
- def _get_outputs(self):
- """Returns a list of all output labels for a step."""
- outputs = []
- for p in self.proto.properties.additionalProperties:
- if p.key == PropertyNames.OUTPUT_INFO:
- for entry in p.value.array_value.entries:
- for entry_prop in entry.object_value.properties:
- if entry_prop.key == PropertyNames.OUTPUT_NAME:
- outputs.append(entry_prop.value.string_value)
- return outputs
-
- def __reduce__(self):
- """Reduce hook for pickling the Step class more easily."""
- return (Step, (self.step_kind, self.step_name,
self._additional_properties))
-
- def get_output(self, tag=None):
- """Returns name if it is one of the outputs or first output if name is
None.
-
- Args:
- tag: tag of the output as a string or None if we want to get the
- name of the first output.
-
- Returns:
- The name of the output associated with the tag or the first output
- if tag was None.
-
- Raises:
- ValueError: if the tag does not exist within outputs.
- """
- outputs = self._get_outputs()
- if tag is None or len(outputs) == 1:
- return outputs[0]
- else:
- if tag not in outputs:
- raise ValueError('Cannot find named output: %s in %s.' % (tag,
outputs))
- return tag
-
-
class Environment(object):
"""Wrapper for a dataflow Environment protobuf."""
def __init__(
@@ -152,7 +94,6 @@ class Environment(object):
environment_version,
proto_pipeline_staged_url,
proto_pipeline=None):
- from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2
self.standard_options = options.view_as(StandardOptions)
self.google_cloud_options = options.view_as(GoogleCloudOptions)
self.worker_options = options.view_as(WorkerOptions)
@@ -192,10 +133,7 @@ class Environment(object):
if self.standard_options.streaming:
job_type = 'FNAPI_STREAMING'
else:
- if _is_runner_v2(options):
- job_type = 'FNAPI_BATCH'
- else:
- job_type = 'PYTHON_BATCH'
+ job_type = 'FNAPI_BATCH'
self.proto.version.additionalProperties.extend([
dataflow.Environment.VersionValue.AdditionalProperty(
key='job_type', value=to_json_value(job_type)),
@@ -297,7 +235,7 @@ class Environment(object):
container_image.capabilities.append(capability)
pool.sdkHarnessContainerImages.append(container_image)
- if not _is_runner_v2(options) or not pool.sdkHarnessContainerImages:
+ if not pool.sdkHarnessContainerImages:
pool.workerHarnessContainerImage = (
get_container_image_from_options(options))
elif len(pool.sdkHarnessContainerImages) == 1:
@@ -554,11 +492,7 @@ class DataflowApplicationClient(object):
self._root_staging_location = (
root_staging_location or self.google_cloud_options.staging_location)
- from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2
- if _is_runner_v2(options):
- self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
- else:
- self.environment_version = _LEGACY_ENVIRONMENT_MAJOR_VERSION
+ self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
if self.google_cloud_options.no_auth:
credentials = None
@@ -1202,46 +1136,31 @@ def get_container_image_from_options(pipeline_options):
Returns:
str: Container image for remote execution.
"""
- from apache_beam.runners.dataflow.dataflow_runner import
_is_runner_v2_disabled
worker_options = pipeline_options.view_as(WorkerOptions)
if worker_options.sdk_container_image:
return worker_options.sdk_container_image
- is_runner_v2 = not _is_runner_v2_disabled(pipeline_options)
-
# Legacy and runner v2 exist in different repositories.
# Set to legacy format, override if runner v2
container_repo = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY
- image_name = '{repository}/python{major}{minor}'.format(
+ image_name = '{repository}/beam_python{major}.{minor}_sdk'.format(
repository=container_repo,
major=sys.version_info[0],
minor=sys.version_info[1])
- if is_runner_v2:
- image_name = '{repository}/beam_python{major}.{minor}_sdk'.format(
- repository=container_repo,
- major=sys.version_info[0],
- minor=sys.version_info[1])
-
- image_tag = _get_required_container_version(is_runner_v2)
+ image_tag = _get_required_container_version()
return image_name + ':' + image_tag
-def _get_required_container_version(is_runner_v2):
+def _get_required_container_version():
"""For internal use only; no backwards-compatibility guarantees.
- Args:
- is_runner_v2 (bool): True if and only if pipeline is using runner v2.
-
Returns:
str: The tag of worker container images in GCR that corresponds to
current version of the SDK.
"""
if 'dev' in beam_version.__version__:
- if is_runner_v2:
- return names.BEAM_FNAPI_CONTAINER_VERSION
- else:
- return names.BEAM_CONTAINER_VERSION
+ return names.BEAM_FNAPI_CONTAINER_VERSION
else:
return _get_container_image_tag()
diff --git
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 89ac58a727b..22e779a8c27 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -653,25 +653,6 @@ class UtilTest(unittest.TestCase):
sys.version_info[1],
names.BEAM_FNAPI_CONTAINER_VERSION)))
- # batch, legacy pipeline.
- pipeline_options = pipeline_options = PipelineOptions([
- '--temp_location',
- 'gs://any-location/temp',
- '--experiments=disable_runner_v2_until_v2.50'
- ])
- env = apiclient.Environment(
- [], #packages
- pipeline_options,
- '2.0.0', #any environment version
- FAKE_PIPELINE_URL)
- self.assertEqual(
- env.proto.workerPools[0].workerHarnessContainerImage,
- (
- names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d:%s' % (
- sys.version_info[0],
- sys.version_info[1],
- names.BEAM_CONTAINER_VERSION)))
-
@mock.patch(
'apache_beam.runners.dataflow.internal.apiclient.'
'beam_version.__version__',
@@ -706,23 +687,6 @@ class UtilTest(unittest.TestCase):
'/beam_python%d.%d_sdk:2.2.0' %
(sys.version_info[0], sys.version_info[1])))
- # batch, legacy pipeline.
- pipeline_options = pipeline_options = PipelineOptions([
- '--temp_location',
- 'gs://any-location/temp',
- '--experiments=disable_runner_v2_until_v2.50'
- ])
- env = apiclient.Environment(
- [], #packages
- pipeline_options,
- '2.0.0', #any environment version
- FAKE_PIPELINE_URL)
- self.assertEqual(
- env.proto.workerPools[0].workerHarnessContainerImage,
- (
- names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d:2.2.0' %
- (sys.version_info[0], sys.version_info[1])))
-
@mock.patch(
'apache_beam.runners.dataflow.internal.apiclient.'
'beam_version.__version__',
@@ -757,23 +721,6 @@ class UtilTest(unittest.TestCase):
'/beam_python%d.%d_sdk:2.2.0' %
(sys.version_info[0], sys.version_info[1])))
- # batch, legacy pipeline
- pipeline_options = pipeline_options = PipelineOptions([
- '--temp_location',
- 'gs://any-location/temp',
- '--experiments=disable_runner_v2_until_v2.50'
- ])
- env = apiclient.Environment(
- [], #packages
- pipeline_options,
- '2.0.0', #any environment version
- FAKE_PIPELINE_URL)
- self.assertEqual(
- env.proto.workerPools[0].workerHarnessContainerImage,
- (
- names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d:2.2.0' %
- (sys.version_info[0], sys.version_info[1])))
-
def test_worker_harness_override_takes_precedence_over_sdk_defaults(self):
# streaming, fnapi pipeline.
pipeline_options = PipelineOptions([
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index c0444037de8..f86306eb276 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -42,79 +42,3 @@ BEAM_CONTAINER_VERSION = 'beam-master-20230629'
BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20230705'
DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3'
-
-
-class TransformNames(object):
- """For internal use only; no backwards-compatibility guarantees.
-
- Transform strings as they are expected in the CloudWorkflow protos.
- """
- COLLECTION_TO_SINGLETON = 'CollectionToSingleton'
- COMBINE = 'CombineValues'
- CREATE_PCOLLECTION = 'CreateCollection'
- DO = 'ParallelDo'
- FLATTEN = 'Flatten'
- GROUP = 'GroupByKey'
- READ = 'ParallelRead'
- WRITE = 'ParallelWrite'
-
-
-class PropertyNames(object):
- """For internal use only; no backwards-compatibility guarantees.
-
- Property strings as they are expected in the CloudWorkflow protos.
- """
- # If uses_keyed_state, whether the state can be sharded.
- ALLOWS_SHARDABLE_STATE = 'allows_shardable_state'
- BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
- BIGQUERY_DATASET = 'dataset'
- BIGQUERY_EXPORT_FORMAT = 'bigquery_export_format'
- BIGQUERY_FLATTEN_RESULTS = 'bigquery_flatten_results'
- BIGQUERY_KMS_KEY = 'bigquery_kms_key'
- BIGQUERY_PROJECT = 'project'
- BIGQUERY_QUERY = 'bigquery_query'
- BIGQUERY_SCHEMA = 'schema'
- BIGQUERY_TABLE = 'table'
- BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql'
- BIGQUERY_WRITE_DISPOSITION = 'write_disposition'
- DISPLAY_DATA = 'display_data'
- ELEMENT = 'element'
- ELEMENTS = 'elements'
- ENCODING = 'encoding'
- FILE_PATTERN = 'filepattern'
- FILE_NAME_PREFIX = 'filename_prefix'
- FILE_NAME_SUFFIX = 'filename_suffix'
- FORMAT = 'format'
- INPUTS = 'inputs'
- IMPULSE_ELEMENT = 'impulse_element'
- NON_PARALLEL_INPUTS = 'non_parallel_inputs'
- NUM_SHARDS = 'num_shards'
- OUT = 'out'
- OUTPUT = 'output'
- OUTPUT_INFO = 'output_info'
- OUTPUT_NAME = 'output_name'
- PARALLEL_INPUT = 'parallel_input'
- PIPELINE_PROTO_TRANSFORM_ID = 'pipeline_proto_transform_id'
- # If the input element is a key/value pair, then the output element(s) all
- # have the same key as the input.
- PRESERVES_KEYS = 'preserves_keys'
- PUBSUB_ID_LABEL = 'pubsub_id_label'
- PUBSUB_SERIALIZED_ATTRIBUTES_FN = 'pubsub_serialized_attributes_fn'
- PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
- PUBSUB_TIMESTAMP_ATTRIBUTE = 'pubsub_timestamp_label'
- PUBSUB_TOPIC = 'pubsub_topic'
- RESOURCE_HINTS = 'resource_hints'
- RESTRICTION_ENCODING = 'restriction_encoding'
- SERIALIZED_FN = 'serialized_fn'
- SHARD_NAME_TEMPLATE = 'shard_template'
- SOURCE_STEP_INPUT = 'custom_source_step_input'
- SERIALIZED_TEST_STREAM = 'serialized_test_stream'
- STEP_NAME = 'step_name'
- USE_INDEXED_FORMAT = 'use_indexed_format'
- USER_FN = 'user_fn'
- USER_NAME = 'user_name'
- USES_KEYED_STATE = 'uses_keyed_state'
- VALIDATE_SINK = 'validate_sink'
- VALIDATE_SOURCE = 'validate_source'
- VALUE = 'value'
- WINDOWING_STRATEGY = 'windowing_strategy'
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 1012a7d3624..8004762f5ee 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -19,101 +19,9 @@
# pytype: skip-file
-from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.pipeline import PTransformOverride
-class CreatePTransformOverride(PTransformOverride):
- """A ``PTransformOverride`` for ``Create`` in streaming mode."""
- def matches(self, applied_ptransform):
- # Imported here to avoid circular dependencies.
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam import Create
- return isinstance(applied_ptransform.transform, Create)
-
- def get_replacement_transform_for_applied_ptransform(
- self, applied_ptransform):
- # Imported here to avoid circular dependencies.
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam import PTransform
-
- ptransform = applied_ptransform.transform
-
- # Return a wrapper rather than ptransform.as_read() directly to
- # ensure backwards compatibility of the pipeline structure.
- class LegacyCreate(PTransform):
- def expand(self, pbegin):
- return pbegin | ptransform.as_read()
-
- return LegacyCreate().with_output_types(ptransform.get_output_type())
-
-
-class ReadPTransformOverride(PTransformOverride):
- """A ``PTransformOverride`` for ``Read(BoundedSource)``"""
- def matches(self, applied_ptransform):
- from apache_beam.io import Read
- from apache_beam.io.iobase import BoundedSource
- # Only overrides Read(BoundedSource) transform
- if (isinstance(applied_ptransform.transform, Read) and
- not getattr(applied_ptransform.transform, 'override', False)):
- if isinstance(applied_ptransform.transform.source, BoundedSource):
- return True
- return False
-
- def get_replacement_transform_for_applied_ptransform(
- self, applied_ptransform):
-
- from apache_beam import pvalue
- from apache_beam.io import iobase
-
- transform = applied_ptransform.transform
-
- class Read(iobase.Read):
- override = True
-
- def expand(self, pbegin):
- return pvalue.PCollection(
- self.pipeline, is_bounded=self.source.is_bounded())
-
- return Read(transform.source).with_output_types(
- transform.get_type_hints().simple_output_type('Read'))
-
-
-class CombineValuesPTransformOverride(PTransformOverride):
- """A ``PTransformOverride`` for ``CombineValues``.
-
- The DataflowRunner expects that the CombineValues PTransform acts as a
- primitive. So this override replaces the CombineValues with a primitive.
- """
- def matches(self, applied_ptransform):
- # Imported here to avoid circular dependencies.
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam import CombineValues
-
- if isinstance(applied_ptransform.transform, CombineValues):
- self.transform = applied_ptransform.transform
- return True
- return False
-
- def get_replacement_transform(self, ptransform):
- # Imported here to avoid circular dependencies.
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam import PTransform
- from apache_beam.pvalue import PCollection
-
- # The DataflowRunner still needs access to the CombineValues members to
- # generate a V1B3 proto representation, so we remember the transform from
- # the matches method and forward it here.
- class CombineValuesReplacement(PTransform):
- def __init__(self, transform):
- self.transform = transform
-
- def expand(self, pcoll):
- return PCollection.from_(pcoll)
-
- return CombineValuesReplacement(self.transform)
-
-
class NativeReadPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``Read`` using native sources.
@@ -150,37 +58,3 @@ class NativeReadPTransformOverride(PTransformOverride):
# will choose the incorrect coder for this transform.
return Read(ptransform.source).with_output_types(
ptransform.source.coder.to_type_hint())
-
-
-class GroupIntoBatchesWithShardedKeyPTransformOverride(PTransformOverride):
- """A ``PTransformOverride`` for ``GroupIntoBatches.WithShardedKey``.
-
- This override simply returns the original transform but additionally records
- the output PCollection in order to append required step properties during
- graph translation.
- """
- def __init__(self, dataflow_runner, options):
- self.dataflow_runner = dataflow_runner
- self.options = options
-
- def matches(self, applied_ptransform):
- # Imported here to avoid circular dependencies.
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam import util
-
- transform = applied_ptransform.transform
-
- if not isinstance(transform, util.GroupIntoBatches.WithShardedKey):
- return False
-
- # The replacement is only valid for portable Streaming Engine jobs with
- # runner v2.
- standard_options = self.options.view_as(StandardOptions)
- if not standard_options.streaming:
- return False
-
- self.dataflow_runner.add_pcoll_with_auto_sharding(applied_ptransform)
- return True
-
- def get_replacement_transform_for_applied_ptransform(self, ptransform):
- return ptransform.transform