This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ca674aad086 Remove unneeded Dataflow Runner v1 code. (#27196)
ca674aad086 is described below

commit ca674aad086e4d1a40e8c01598073030a22484fa
Author: Robert Bradshaw <[email protected]>
AuthorDate: Mon Jul 10 16:31:55 2023 -0700

    Remove unneeded Dataflow Runner v1 code. (#27196)
---
 sdks/python/apache_beam/coders/coders.py           | 133 ----
 .../apache_beam/coders/coders_test_common.py       |  41 -
 sdks/python/apache_beam/coders/row_coder.py        |   9 -
 sdks/python/apache_beam/coders/row_coder_test.py   |  11 -
 .../runners/dataflow/dataflow_runner.py            | 828 +--------------------
 .../runners/dataflow/dataflow_runner_test.py       | 369 +--------
 .../runners/dataflow/internal/apiclient.py         |  95 +--
 .../runners/dataflow/internal/apiclient_test.py    |  53 --
 .../apache_beam/runners/dataflow/internal/names.py |  76 --
 .../runners/dataflow/ptransform_overrides.py       | 126 ----
 10 files changed, 49 insertions(+), 1692 deletions(-)

diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index d4ca99b80fb..7c5c8e09303 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -280,29 +280,6 @@ class Coder(object):
     # refined in user-defined Coders.
     return []
 
-  def as_cloud_object(self, coders_context=None):
-    """For internal use only; no backwards-compatibility guarantees.
-
-    Returns Google Cloud Dataflow API description of this coder."""
-    # This is an internal detail of the Coder API and does not need to be
-    # refined in user-defined Coders.
-
-    value = {
-        # We pass coders in the form "<coder_name>$<pickled_data>" to make the
-        # job description JSON more readable.  Data before the $ is ignored by
-        # the worker.
-        '@type': serialize_coder(self),
-        'component_encodings': [
-            component.as_cloud_object(coders_context)
-            for component in self._get_component_coders()
-        ],
-    }
-
-    if coders_context:
-      value['pipeline_proto_coder_id'] = coders_context.get_id(self)
-
-    return value
-
   def __repr__(self):
     return self.__class__.__name__
 
@@ -493,11 +470,6 @@ class BytesCoder(FastCoder):
   def to_type_hint(self):
     return bytes
 
-  def as_cloud_object(self, coders_context=None):
-    return {
-        '@type': 'kind:bytes',
-    }
-
   def __eq__(self, other):
     return type(self) == type(other)
 
@@ -667,11 +639,6 @@ class VarIntCoder(FastCoder):
   def to_type_hint(self):
     return int
 
-  def as_cloud_object(self, coders_context=None):
-    return {
-        '@type': 'kind:varint',
-    }
-
   def __eq__(self, other):
     return type(self) == type(other)
 
@@ -846,21 +813,6 @@ class _PickleCoderBase(FastCoder):
     # GroupByKey operations.
     return False
 
-  def as_cloud_object(self, coders_context=None, is_pair_like=True):
-    value = super().as_cloud_object(coders_context)
-    # We currently use this coder in places where we cannot infer the coder to
-    # use for the value type in a more granular way.  In places where the
-    # service expects a pair, it checks for the "is_pair_like" key, in which
-    # case we would fail without the hack below.
-    if is_pair_like:
-      value['is_pair_like'] = True
-      value['component_encodings'] = [
-          self.as_cloud_object(coders_context, is_pair_like=False),
-          self.as_cloud_object(coders_context, is_pair_like=False)
-      ]
-
-    return value
-
   # We allow .key_coder() and .value_coder() to be called on PickleCoder since
   # we can't always infer the return values of lambdas in ParDo operations, the
   # result of which may be used in a GroupBykey.
@@ -983,21 +935,6 @@ class FastPrimitivesCoder(FastCoder):
   def to_type_hint(self):
     return Any
 
-  def as_cloud_object(self, coders_context=None, is_pair_like=True):
-    value = super().as_cloud_object(coders_context)
-    # We currently use this coder in places where we cannot infer the coder to
-    # use for the value type in a more granular way.  In places where the
-    # service expects a pair, it checks for the "is_pair_like" key, in which
-    # case we would fail without the hack below.
-    if is_pair_like:
-      value['is_pair_like'] = True
-      value['component_encodings'] = [
-          self.as_cloud_object(coders_context, is_pair_like=False),
-          self.as_cloud_object(coders_context, is_pair_like=False)
-      ]
-
-    return value
-
   # We allow .key_coder() and .value_coder() to be called on 
FastPrimitivesCoder
   # since we can't always infer the return values of lambdas in ParDo
   # operations, the result of which may be used in a GroupBykey.
@@ -1231,19 +1168,6 @@ class TupleCoder(FastCoder):
     # type: (typehints.TupleConstraint, CoderRegistry) -> TupleCoder
     return cls([registry.get_coder(t) for t in typehint.tuple_types])
 
-  def as_cloud_object(self, coders_context=None):
-    if self.is_kv_coder():
-      return {
-          '@type': 'kind:pair',
-          'is_pair_like': True,
-          'component_encodings': [
-              component.as_cloud_object(coders_context)
-              for component in self._get_component_coders()
-          ],
-      }
-
-    return super().as_cloud_object(coders_context)
-
   def _get_component_coders(self):
     # type: () -> Tuple[Coder, ...]
     return self.coders()
@@ -1353,15 +1277,6 @@ class ListLikeCoder(FastCoder):
       return type(self)(
           self._elem_coder.as_deterministic_coder(step_label, error_message))
 
-  def as_cloud_object(self, coders_context=None):
-    return {
-        '@type': 'kind:stream',
-        'is_stream_like': True,
-        'component_encodings': [
-            self._elem_coder.as_cloud_object(coders_context)
-        ],
-    }
-
   def value_coder(self):
     return self._elem_coder
 
@@ -1409,11 +1324,6 @@ class GlobalWindowCoder(SingletonCoder):
     from apache_beam.transforms import window
     super().__init__(window.GlobalWindow())
 
-  def as_cloud_object(self, coders_context=None):
-    return {
-        '@type': 'kind:global_window',
-    }
-
 
 Coder.register_structured_urn(
     common_urns.coders.GLOBAL_WINDOW.urn, GlobalWindowCoder)
@@ -1428,11 +1338,6 @@ class IntervalWindowCoder(FastCoder):
     # type: () -> bool
     return True
 
-  def as_cloud_object(self, coders_context=None):
-    return {
-        '@type': 'kind:interval_window',
-    }
-
   def __eq__(self, other):
     return type(self) == type(other)
 
@@ -1466,16 +1371,6 @@ class WindowedValueCoder(FastCoder):
         c.is_deterministic() for c in
         [self.wrapped_value_coder, self.timestamp_coder, self.window_coder])
 
-  def as_cloud_object(self, coders_context=None):
-    return {
-        '@type': 'kind:windowed_value',
-        'is_wrapper': True,
-        'component_encodings': [
-            component.as_cloud_object(coders_context)
-            for component in self._get_component_coders()
-        ],
-    }
-
   def _get_component_coders(self):
     # type: () -> List[Coder]
     return [self.wrapped_value_coder, self.window_coder]
@@ -1527,10 +1422,6 @@ class ParamWindowedValueCoder(WindowedValueCoder):
     # type: () -> bool
     return self.wrapped_value_coder.is_deterministic()
 
-  def as_cloud_object(self, coders_context=None):
-    raise NotImplementedError(
-        "as_cloud_object not supported for ParamWindowedValueCoder")
-
   def __repr__(self):
     return 'ParamWindowedValueCoder[%s]' % self.wrapped_value_coder
 
@@ -1577,14 +1468,6 @@ class LengthPrefixCoder(FastCoder):
   def value_coder(self):
     return self._value_coder
 
-  def as_cloud_object(self, coders_context=None):
-    return {
-        '@type': 'kind:length_prefix',
-        'component_encodings': [
-            self._value_coder.as_cloud_object(coders_context)
-        ],
-    }
-
   def _get_component_coders(self):
     # type: () -> Tuple[Coder, ...]
     return (self._value_coder, )
@@ -1680,14 +1563,6 @@ class ShardedKeyCoder(FastCoder):
     # type: () -> bool
     return self._key_coder.is_deterministic()
 
-  def as_cloud_object(self, coders_context=None):
-    return {
-        '@type': 'kind:sharded_key',
-        'component_encodings': [
-            self._key_coder.as_cloud_object(coders_context)
-        ],
-    }
-
   def to_type_hint(self):
     from apache_beam.typehints import sharded_key_type
     return sharded_key_type.ShardedKeyTypeConstraint(
@@ -1738,14 +1613,6 @@ class TimestampPrefixingWindowCoder(FastCoder):
   def is_deterministic(self) -> bool:
     return self._window_coder.is_deterministic()
 
-  def as_cloud_object(self, coders_context=None):
-    return {
-        '@type': 'kind:custom_window',
-        'component_encodings': [
-            self._window_coder.as_cloud_object(coders_context)
-        ],
-    }
-
   def __repr__(self):
     return 'TimestampPrefixingWindowCoder[%r]' % self._window_coder
 
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index 8b6674aebec..70582e7992a 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -387,16 +387,6 @@ class CodersTest(unittest.TestCase):
 
   def test_tuple_coder(self):
     kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
-    # Verify cloud object representation
-    self.assertEqual({
-        '@type': 'kind:pair',
-        'is_pair_like': True,
-        'component_encodings': [
-            coders.VarIntCoder().as_cloud_object(),
-            coders.BytesCoder().as_cloud_object()
-        ],
-    },
-                     kv_coder.as_cloud_object())
     # Test binary representation
     self.assertEqual(b'\x04abc', kv_coder.encode((4, b'abc')))
     # Test unnested
@@ -424,13 +414,6 @@ class CodersTest(unittest.TestCase):
 
   def test_iterable_coder(self):
     iterable_coder = coders.IterableCoder(coders.VarIntCoder())
-    # Verify cloud object representation
-    self.assertEqual({
-        '@type': 'kind:stream',
-        'is_stream_like': True,
-        'component_encodings': [coders.VarIntCoder().as_cloud_object()]
-    },
-                     iterable_coder.as_cloud_object())
     # Test unnested
     self.check_coder(iterable_coder, [1], [-1, 0, 100])
     # Test nested
@@ -507,16 +490,6 @@ class CodersTest(unittest.TestCase):
   def test_windowed_value_coder(self):
     coder = coders.WindowedValueCoder(
         coders.VarIntCoder(), coders.GlobalWindowCoder())
-    # Verify cloud object representation
-    self.assertEqual({
-        '@type': 'kind:windowed_value',
-        'is_wrapper': True,
-        'component_encodings': [
-            coders.VarIntCoder().as_cloud_object(),
-            coders.GlobalWindowCoder().as_cloud_object(),
-        ],
-    },
-                     coder.as_cloud_object())
     # Test binary representation
     self.assertEqual(
         b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01',
@@ -618,8 +591,6 @@ class CodersTest(unittest.TestCase):
   def test_global_window_coder(self):
     coder = coders.GlobalWindowCoder()
     value = window.GlobalWindow()
-    # Verify cloud object representation
-    self.assertEqual({'@type': 'kind:global_window'}, coder.as_cloud_object())
     # Test binary representation
     self.assertEqual(b'', coder.encode(value))
     self.assertEqual(value, coder.decode(b''))
@@ -630,12 +601,6 @@ class CodersTest(unittest.TestCase):
 
   def test_length_prefix_coder(self):
     coder = coders.LengthPrefixCoder(coders.BytesCoder())
-    # Verify cloud object representation
-    self.assertEqual({
-        '@type': 'kind:length_prefix',
-        'component_encodings': [coders.BytesCoder().as_cloud_object()]
-    },
-                     coder.as_cloud_object())
     # Test binary representation
     self.assertEqual(b'\x00', coder.encode(b''))
     self.assertEqual(b'\x01a', coder.encode(b'a'))
@@ -725,12 +690,6 @@ class CodersTest(unittest.TestCase):
 
     for key, bytes_repr, key_coder in key_and_coders:
       coder = coders.ShardedKeyCoder(key_coder)
-      # Verify cloud object representation
-      self.assertEqual({
-          '@type': 'kind:sharded_key',
-          'component_encodings': [key_coder.as_cloud_object()]
-      },
-                       coder.as_cloud_object())
 
       # Test str repr
       self.assertEqual('%s' % coder, 'ShardedKeyCoder[%s]' % key_coder)
diff --git a/sdks/python/apache_beam/coders/row_coder.py 
b/sdks/python/apache_beam/coders/row_coder.py
index 19424fa1f12..7765ccebc26 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -17,8 +17,6 @@
 
 # pytype: skip-file
 
-from google.protobuf import json_format
-
 from apache_beam.coders import typecoders
 from apache_beam.coders.coder_impl import LogicalTypeCoderImpl
 from apache_beam.coders.coder_impl import RowCoderImpl
@@ -91,13 +89,6 @@ class RowCoder(FastCoder):
   def to_type_hint(self):
     return self._type_hint
 
-  def as_cloud_object(self, coders_context=None):
-    value = super().as_cloud_object(coders_context)
-
-    value['schema'] = json_format.MessageToJson(self.schema).encode('utf-8')
-
-    return value
-
   def __hash__(self):
     return hash(self.schema.SerializeToString())
 
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py 
b/sdks/python/apache_beam/coders/row_coder_test.py
index dbca3e7f69c..6ac982835cb 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -22,7 +22,6 @@ import unittest
 from itertools import chain
 
 import numpy as np
-from google.protobuf import json_format
 from numpy.testing import assert_array_equal
 
 import apache_beam as beam
@@ -398,16 +397,6 @@ class RowCoderTest(unittest.TestCase):
     self.assertRaisesRegex(
         ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))
 
-  def test_row_coder_cloud_object_schema(self):
-    schema_proto = schema_pb2.Schema(id='some-cloud-object-schema')
-    schema_proto_json = json_format.MessageToJson(schema_proto).encode('utf-8')
-
-    coder = RowCoder(schema_proto)
-
-    cloud_object = coder.as_cloud_object()
-
-    self.assertEqual(schema_proto_json, cloud_object['schema'])
-
   def test_batch_encode_decode(self):
     coder = RowCoder(typing_to_runner_api(Person).row_type.schema).get_impl()
     seq_out = coder_impl.create_OutputStream()
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 674d05c64ec..315dc8ff700 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -22,26 +22,18 @@ to the Dataflow Service for remote execution by a worker.
 """
 # pytype: skip-file
 
-import base64
 import logging
 import os
 import threading
 import time
-import traceback
 import warnings
 from collections import defaultdict
 from subprocess import DEVNULL
 from typing import TYPE_CHECKING
 from typing import List
-from urllib.parse import quote
-from urllib.parse import quote_from_bytes
-from urllib.parse import unquote_to_bytes
 
 import apache_beam as beam
 from apache_beam import coders
-from apache_beam import error
-from apache_beam.internal import pickler
-from apache_beam.internal.gcp import json_value
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import SetupOptions
@@ -51,23 +43,14 @@ from apache_beam.options.pipeline_options import TypeOptions
 from apache_beam.options.pipeline_options import WorkerOptions
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.pvalue import AsSideInput
 from apache_beam.runners.common import DoFnSignature
 from apache_beam.runners.common import group_by_key_input_visitor
-from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow as 
dataflow_api
-from apache_beam.runners.dataflow.internal.names import PropertyNames
-from apache_beam.runners.dataflow.internal.names import TransformNames
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PValueCache
-from apache_beam.transforms import window
-from apache_beam.transforms.display import DisplayData
-from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX
 from apache_beam.typehints import typehints
 from apache_beam.utils import processes
-from apache_beam.utils import proto_utils
 from apache_beam.utils.interactive_utils import is_in_notebook
 from apache_beam.utils.plugin import BeamPlugin
 
@@ -103,9 +86,6 @@ class DataflowRunner(PipelineRunner):
 
   # Imported here to avoid circular dependencies.
   # TODO: Remove the apache_beam.pipeline dependency in 
CreatePTransformOverride
-  from apache_beam.runners.dataflow.ptransform_overrides import 
CombineValuesPTransformOverride
-  from apache_beam.runners.dataflow.ptransform_overrides import 
CreatePTransformOverride
-  from apache_beam.runners.dataflow.ptransform_overrides import 
ReadPTransformOverride
   from apache_beam.runners.dataflow.ptransform_overrides import 
NativeReadPTransformOverride
 
   # These overrides should be applied before the proto representation of the
@@ -114,19 +94,7 @@ class DataflowRunner(PipelineRunner):
       NativeReadPTransformOverride(),
   ]  # type: List[PTransformOverride]
 
-  # These overrides should be applied after the proto representation of the
-  # graph is created.
-  _NON_PORTABLE_PTRANSFORM_OVERRIDES = [
-      CombineValuesPTransformOverride(),
-      CreatePTransformOverride(),
-      ReadPTransformOverride(),
-  ]  # type: List[PTransformOverride]
-
   def __init__(self, cache=None):
-    # Cache of CloudWorkflowStep protos generated while the runner
-    # "executes" a pipeline.
-    self._cache = cache if cache is not None else PValueCache()
-    self._unique_step_id = 0
     self._default_environment = None
 
   def is_fnapi_compatible(self):
@@ -136,10 +104,6 @@ class DataflowRunner(PipelineRunner):
     _check_and_add_missing_options(options)
     return super().apply(transform, input, options)
 
-  def _get_unique_step_name(self):
-    self._unique_step_id += 1
-    return 's%s' % self._unique_step_id
-
   @staticmethod
   def poll_for_job_completion(
       runner, result, duration, state_update_callback=None):
@@ -262,7 +226,7 @@ class DataflowRunner(PipelineRunner):
     return element
 
   @staticmethod
-  def side_input_visitor(is_runner_v2=False, deterministic_key_coders=True):
+  def side_input_visitor(deterministic_key_coders=True):
     # Imported here to avoid circular dependencies.
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.pipeline import PipelineVisitor
@@ -300,9 +264,8 @@ class DataflowRunner(PipelineRunner):
                   'Unsupported access pattern for %r: %r' %
                   (transform_node.full_label, access_pattern))
             new_side_inputs.append(new_side_input)
-          if is_runner_v2:
-            transform_node.side_inputs = new_side_inputs
-            transform_node.transform.side_inputs = new_side_inputs
+          transform_node.side_inputs = new_side_inputs
+          transform_node.transform.side_inputs = new_side_inputs
 
     return SideInputVisitor()
 
@@ -363,20 +326,12 @@ class DataflowRunner(PipelineRunner):
             not pipeline._options.view_as(
                 TypeOptions).allow_non_deterministic_key_coders))
 
-  def _check_for_unsupported_features_on_non_portable_worker(self, pipeline):
-    pipeline.visit(self.combinefn_visitor())
-
   def run_pipeline(self, pipeline, options, pipeline_proto=None):
     """Remotely executes entire pipeline or parts reachable from node."""
     if _is_runner_v2_disabled(options):
-      debug_options = options.view_as(DebugOptions)
-      if not debug_options.lookup_experiment('disable_runner_v2_until_v2.50'):
-        raise ValueError(
-            'disable_runner_v2 is deprecated in Beam Python ' +
-            beam.version.__version__ +
-            ' and this execution mode will be removed in a future Beam SDK. '
-            'If needed, please use: '
-            '"--experiments=disable_runner_v2_until_v2.50".')
+      raise ValueError(
+          'Disabling Runner V2 no longer supported '
+          'using Beam Python %s.' % beam.version.__version__)
 
     # Label goog-dataflow-notebook if job is started from notebook.
     if is_in_notebook():
@@ -397,26 +352,12 @@ class DataflowRunner(PipelineRunner):
           'Google Cloud Dataflow runner not available, '
           'please install apache_beam[gcp]')
 
-    if pipeline_proto or pipeline.contains_external_transforms:
-      if _is_runner_v2_disabled(options):
-        raise ValueError(
-            'This pipeline contains cross language transforms, '
-            'which requires Runner V2.')
-      if not _is_runner_v2(options):
-        _LOGGER.info(
-            'Automatically enabling Dataflow Runner V2 since the '
-            'pipeline used cross-language transforms.')
-        _add_runner_v2_missing_options(options)
-
-    is_runner_v2 = _is_runner_v2(options)
-    if not is_runner_v2:
-      self._check_for_unsupported_features_on_non_portable_worker(pipeline)
-
     # Convert all side inputs into a form acceptable to Dataflow.
     if pipeline:
+      pipeline.visit(self.combinefn_visitor())
+
       pipeline.visit(
           self.side_input_visitor(
-              _is_runner_v2(options),
               deterministic_key_coders=not options.view_as(
                   TypeOptions).allow_non_deterministic_key_coders))
 
@@ -430,10 +371,6 @@ class DataflowRunner(PipelineRunner):
             "Native sinks no longer implemented; "
             "ignoring use_legacy_bq_sink.")
 
-      from apache_beam.runners.dataflow.ptransform_overrides import 
GroupIntoBatchesWithShardedKeyPTransformOverride
-      pipeline.replace_all(
-          [GroupIntoBatchesWithShardedKeyPTransformOverride(self, options)])
-
     if pipeline_proto:
       self.proto_pipeline = pipeline_proto
 
@@ -449,7 +386,7 @@ class DataflowRunner(PipelineRunner):
             self._default_environment.container_image)
       else:
         artifacts = environments.python_sdk_dependencies(options)
-        if artifacts and _is_runner_v2(options):
+        if artifacts:
           _LOGGER.info(
               "Pipeline has additional dependencies to be installed "
               "in SDK worker container, consider using the SDK "
@@ -501,11 +438,6 @@ class DataflowRunner(PipelineRunner):
             known_runner_urns=frozenset(),
             partial=True)
 
-    if not is_runner_v2:
-      # Performing configured PTransform overrides which should not be 
reflected
-      # in the proto representation of the graph.
-      pipeline.replace_all(DataflowRunner._NON_PORTABLE_PTRANSFORM_OVERRIDES)
-
     # Add setup_options for all the BeamPlugin imports
     setup_options = options.view_as(SetupOptions)
     plugins = BeamPlugin.get_all_plugin_paths()
@@ -523,16 +455,6 @@ class DataflowRunner(PipelineRunner):
 
     self.job = apiclient.Job(options, self.proto_pipeline)
 
-    # TODO: Consider skipping these for all use_portable_job_submission jobs.
-    if pipeline:
-      # Dataflow Runner v1 requires output type of the Flatten to be the same 
as
-      # the inputs, hence we enforce that here. Dataflow Runner v2 does not
-      # require this.
-      pipeline.visit(self.flatten_input_visitor())
-
-      # Trigger a traversal of all reachable nodes.
-      self.visit_transforms(pipeline, options)
-
     test_options = options.view_as(TestOptions)
     # If it is a dry run, return without submitting the job.
     if test_options.dry_run:
@@ -557,11 +479,6 @@ class DataflowRunner(PipelineRunner):
     result.metric_results = self._metrics
     return result
 
-  def _get_typehint_based_encoding(self, typehint, window_coder):
-    """Returns an encoding based on a typehint object."""
-    return self._get_cloud_encoding(
-        self._get_coder(typehint, window_coder=window_coder))
-
   @staticmethod
   def _get_coder(typehint, window_coder):
     """Returns a coder based on a typehint object."""
@@ -570,197 +487,6 @@ class DataflowRunner(PipelineRunner):
           coders.registry.get_coder(typehint), window_coder=window_coder)
     return coders.registry.get_coder(typehint)
 
-  def _get_cloud_encoding(self, coder, unused=None):
-    """Returns an encoding based on a coder object."""
-    if not isinstance(coder, coders.Coder):
-      raise TypeError(
-          'Coder object must inherit from coders.Coder: %s.' % str(coder))
-    return coder.as_cloud_object(self.proto_context.coders)
-
-  def _get_side_input_encoding(self, input_encoding):
-    """Returns an encoding for the output of a view transform.
-
-    Args:
-      input_encoding: encoding of current transform's input. Side inputs need
-        this because the service will check that input and output types match.
-
-    Returns:
-      An encoding that matches the output and input encoding. This is essential
-      for the View transforms introduced to produce side inputs to a ParDo.
-    """
-    return {
-        '@type': 'kind:stream',
-        'component_encodings': [input_encoding],
-        'is_stream_like': {
-            'value': True
-        },
-    }
-
-  def _get_encoded_output_coder(
-      self, transform_node, window_value=True, output_tag=None):
-    """Returns the cloud encoding of the coder for the output of a 
transform."""
-
-    if output_tag in transform_node.outputs:
-      element_type = transform_node.outputs[output_tag].element_type
-    elif len(transform_node.outputs) == 1:
-      output_tag = DataflowRunner._only_element(transform_node.outputs.keys())
-      # TODO(robertwb): Handle type hints for multi-output transforms.
-      element_type = transform_node.outputs[output_tag].element_type
-
-    else:
-      # TODO(silviuc): Remove this branch (and assert) when typehints are
-      # propagated everywhere. Returning an 'Any' as type hint will trigger
-      # usage of the fallback coder (i.e., cPickler).
-      element_type = typehints.Any
-    if window_value:
-      # All outputs have the same windowing. So getting the coder from an
-      # arbitrary window is fine.
-      output_tag = next(iter(transform_node.outputs.keys()))
-      window_coder = (
-          transform_node.outputs[output_tag].windowing.windowfn.
-          get_window_coder())
-    else:
-      window_coder = None
-    return self._get_typehint_based_encoding(element_type, window_coder)
-
-  def get_pcoll_with_auto_sharding(self):
-    if not hasattr(self, '_pcoll_with_auto_sharding'):
-      return set()
-    return self._pcoll_with_auto_sharding
-
-  def add_pcoll_with_auto_sharding(self, applied_ptransform):
-    if not hasattr(self, '_pcoll_with_auto_sharding'):
-      self.__setattr__('_pcoll_with_auto_sharding', set())
-    output = DataflowRunner._only_element(applied_ptransform.outputs.keys())
-    self._pcoll_with_auto_sharding.add(
-        applied_ptransform.outputs[output]._unique_name())
-
-  def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
-    """Creates a Step object and adds it to the cache."""
-    # Import here to avoid adding the dependency for local running scenarios.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam.runners.dataflow.internal import apiclient
-    step = apiclient.Step(step_kind, self._get_unique_step_name())
-    self.job.proto.steps.append(step.proto)
-    step.add_property(PropertyNames.USER_NAME, step_label)
-    # Cache the node/step association for the main output of the transform 
node.
-
-    # External transforms may not use 'None' as an output tag.
-    output_tags = ([None] +
-                   list(side_tags) if None in transform_node.outputs.keys() 
else
-                   list(transform_node.outputs.keys()))
-
-    # We have to cache output for all tags since some transforms may produce
-    # multiple outputs.
-    for output_tag in output_tags:
-      self._cache.cache_output(transform_node, output_tag, step)
-
-    # Finally, we add the display data items to the pipeline step.
-    # If the transform contains no display data then an empty list is added.
-    step.add_property(
-        PropertyNames.DISPLAY_DATA,
-        [
-            item.get_dict()
-            for item in DisplayData.create_from(transform_node.transform).items
-        ])
-
-    if transform_node.resource_hints:
-      step.add_property(
-          PropertyNames.RESOURCE_HINTS,
-          {
-              hint: quote_from_bytes(value)
-              for (hint, value) in transform_node.resource_hints.items()
-          })
-
-    return step
-
-  def _add_singleton_step(
-      self,
-      label,
-      full_label,
-      tag,
-      input_step,
-      windowing_strategy,
-      access_pattern):
-    """Creates a CollectionToSingleton step used to handle ParDo side 
inputs."""
-    # Import here to avoid adding the dependency for local running scenarios.
-    from apache_beam.runners.dataflow.internal import apiclient
-    step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label)
-    self.job.proto.steps.append(step.proto)
-    step.add_property(PropertyNames.USER_NAME, full_label)
-    step.add_property(
-        PropertyNames.PARALLEL_INPUT,
-        {
-            '@type': 'OutputReference',
-            PropertyNames.STEP_NAME: input_step.proto.name,
-            PropertyNames.OUTPUT_NAME: input_step.get_output(tag)
-        })
-    step.encoding = self._get_side_input_encoding(input_step.encoding)
-
-    output_info = {
-        PropertyNames.USER_NAME: '%s.%s' % (full_label, PropertyNames.OUTPUT),
-        PropertyNames.ENCODING: step.encoding,
-        PropertyNames.OUTPUT_NAME: PropertyNames.OUT
-    }
-    if common_urns.side_inputs.MULTIMAP.urn == access_pattern:
-      output_info[PropertyNames.USE_INDEXED_FORMAT] = True
-    step.add_property(PropertyNames.OUTPUT_INFO, [output_info])
-
-    step.add_property(
-        PropertyNames.WINDOWING_STRATEGY,
-        self.serialize_windowing_strategy(
-            windowing_strategy, self._default_environment))
-    return step
-
-  def run_Impulse(self, transform_node, options):
-    step = self._add_step(
-        TransformNames.READ, transform_node.full_label, transform_node)
-    step.add_property(PropertyNames.FORMAT, 'impulse')
-    encoded_impulse_element = coders.WindowedValueCoder(
-        coders.BytesCoder(),
-        coders.coders.GlobalWindowCoder()).get_impl().encode_nested(
-            window.GlobalWindows.windowed_value(b''))
-    if _is_runner_v2(options):
-      encoded_impulse_as_str = self.byte_array_to_json_string(
-          encoded_impulse_element)
-    else:
-      encoded_impulse_as_str = 
base64.b64encode(encoded_impulse_element).decode(
-          'ascii')
-
-    step.add_property(PropertyNames.IMPULSE_ELEMENT, encoded_impulse_as_str)
-
-    step.encoding = self._get_encoded_output_coder(transform_node)
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{
-            PropertyNames.USER_NAME: (
-                '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-            PropertyNames.ENCODING: step.encoding,
-            PropertyNames.OUTPUT_NAME: PropertyNames.OUT
-        }])
-
-  def run_Flatten(self, transform_node, options):
-    step = self._add_step(
-        TransformNames.FLATTEN, transform_node.full_label, transform_node)
-    inputs = []
-    for one_input in transform_node.inputs:
-      input_step = self._cache.get_pvalue(one_input)
-      inputs.append({
-          '@type': 'OutputReference',
-          PropertyNames.STEP_NAME: input_step.proto.name,
-          PropertyNames.OUTPUT_NAME: input_step.get_output(one_input.tag)
-      })
-    step.add_property(PropertyNames.INPUTS, inputs)
-    step.encoding = self._get_encoded_output_coder(transform_node)
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{
-            PropertyNames.USER_NAME: (
-                '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-            PropertyNames.ENCODING: step.encoding,
-            PropertyNames.OUTPUT_NAME: PropertyNames.OUT
-        }])
-
   # TODO(srohde): Remove this after internal usages have been removed.
   def apply_GroupByKey(self, transform, pcoll, options):
     return transform.expand(pcoll)
@@ -784,512 +510,6 @@ class DataflowRunner(PipelineRunner):
     coders.registry.verify_deterministic(
         coder.key_coder(), 'GroupByKey operation "%s"' % transform.label)
 
-  def run_GroupByKey(self, transform_node, options):
-    input_tag = transform_node.inputs[0].tag
-    input_step = self._cache.get_pvalue(transform_node.inputs[0])
-
-    # Verify that the GBK's parent has a KV coder.
-    self._verify_gbk_coders(transform_node.transform, transform_node.inputs[0])
-
-    step = self._add_step(
-        TransformNames.GROUP, transform_node.full_label, transform_node)
-    step.add_property(
-        PropertyNames.PARALLEL_INPUT,
-        {
-            '@type': 'OutputReference',
-            PropertyNames.STEP_NAME: input_step.proto.name,
-            PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)
-        })
-    step.encoding = self._get_encoded_output_coder(transform_node)
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{
-            PropertyNames.USER_NAME: (
-                '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-            PropertyNames.ENCODING: step.encoding,
-            PropertyNames.OUTPUT_NAME: PropertyNames.OUT
-        }])
-    windowing = transform_node.transform.get_windowing(transform_node.inputs)
-    step.add_property(
-        PropertyNames.SERIALIZED_FN,
-        self.serialize_windowing_strategy(windowing, 
self._default_environment))
-
-  def run_ExternalTransform(self, transform_node, options):
-    # Adds a dummy step to the Dataflow job description so that inputs and
-    # outputs are mapped correctly in the presence of external transforms.
-    #
-    # Note that Dataflow Python multi-language pipelines use Portable Job
-    # Submission by default, hence this step and rest of the Dataflow step
-    # definitions defined here are not used at Dataflow service but we have to
-    # maintain the mapping correctly till we can fully drop the Dataflow step
-    # definitions from the SDK.
-
-    # AppliedTransform node outputs have to be updated to correctly map the
-    # outputs for external transforms.
-    transform_node.outputs = ({
-        output.tag: output
-        for output in transform_node.outputs.values()
-    })
-
-    self.run_Impulse(transform_node, options)
-
-  def run_ParDo(self, transform_node, options):
-    transform = transform_node.transform
-    input_tag = transform_node.inputs[0].tag
-    input_step = self._cache.get_pvalue(transform_node.inputs[0])
-
-    # Attach side inputs.
-    si_dict = {}
-    si_labels = {}
-    full_label_counts = defaultdict(int)
-    lookup_label = lambda side_pval: si_labels[side_pval]
-    named_inputs = transform_node.named_inputs()
-    label_renames = {}
-    for ix, side_pval in enumerate(transform_node.side_inputs):
-      assert isinstance(side_pval, AsSideInput)
-      step_name = 'SideInput-' + self._get_unique_step_name()
-      si_label = ((SIDE_INPUT_PREFIX + '%d-%s') %
-                  (ix, transform_node.full_label))
-      old_label = (SIDE_INPUT_PREFIX + '%d') % ix
-
-      label_renames[old_label] = si_label
-
-      assert old_label in named_inputs
-      pcollection_label = '%s.%s' % (
-          side_pval.pvalue.producer.full_label.split('/')[-1],
-          side_pval.pvalue.tag if side_pval.pvalue.tag else 'out')
-      si_full_label = '%s/%s(%s.%s)' % (
-          transform_node.full_label,
-          side_pval.__class__.__name__,
-          pcollection_label,
-          full_label_counts[pcollection_label])
-
-      # Count the number of times the same PCollection is a side input
-      # to the same ParDo.
-      full_label_counts[pcollection_label] += 1
-
-      self._add_singleton_step(
-          step_name,
-          si_full_label,
-          side_pval.pvalue.tag,
-          self._cache.get_pvalue(side_pval.pvalue),
-          side_pval.pvalue.windowing,
-          side_pval._side_input_data().access_pattern)
-      si_dict[si_label] = {
-          '@type': 'OutputReference',
-          PropertyNames.STEP_NAME: step_name,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUT
-      }
-      si_labels[side_pval] = si_label
-
-    # Now create the step for the ParDo transform being handled.
-    transform_name = transform_node.full_label.rsplit('/', 1)[-1]
-    step = self._add_step(
-        TransformNames.DO,
-        transform_node.full_label +
-        ('/{}'.format(transform_name) if transform_node.side_inputs else ''),
-        transform_node,
-        transform_node.transform.output_tags)
-    transform_proto = self.proto_context.transforms.get_proto(transform_node)
-    transform_id = self.proto_context.transforms.get_id(transform_node)
-    is_runner_v2 = _is_runner_v2(options)
-    # Patch side input ids to be unique across a given pipeline.
-    if (label_renames and
-        transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
-      # Patch PTransform proto.
-      for old, new in label_renames.items():
-        transform_proto.inputs[new] = transform_proto.inputs[old]
-        del transform_proto.inputs[old]
-
-      # Patch ParDo proto.
-      proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
-      proto = proto_utils.parse_Bytes(transform_proto.spec.payload, proto_type)
-      for old, new in label_renames.items():
-        proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
-        del proto.side_inputs[old]
-      transform_proto.spec.payload = proto.SerializeToString()
-      # We need to update the pipeline proto.
-      del self.proto_pipeline.components.transforms[transform_id]
-      (
-          self.proto_pipeline.components.transforms[transform_id].CopyFrom(
-              transform_proto))
-    # The data transmitted in SERIALIZED_FN is different depending on whether
-    # this is a runner v2 pipeline or not.
-    if is_runner_v2:
-      serialized_data = transform_id
-    else:
-      serialized_data = pickler.dumps(
-          self._pardo_fn_data(transform_node, lookup_label))
-    step.add_property(PropertyNames.SERIALIZED_FN, serialized_data)
-    # TODO(BEAM-8882): Enable once dataflow service doesn't reject this.
-    # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID, 
transform_id)
-    step.add_property(
-        PropertyNames.PARALLEL_INPUT,
-        {
-            '@type': 'OutputReference',
-            PropertyNames.STEP_NAME: input_step.proto.name,
-            PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)
-        })
-    # Add side inputs if any.
-    step.add_property(PropertyNames.NON_PARALLEL_INPUTS, si_dict)
-
-    # Generate description for the outputs. The output names
-    # will be 'None' for main output and '<tag>' for a tagged output.
-    outputs = []
-
-    all_output_tags = list(transform_proto.outputs.keys())
-
-    # Some external transforms require output tags to not be modified.
-    # So we randomly select one of the output tags as the main output and
-    # leave others as side outputs. Transform execution should not change
-    # dependending on which output tag we choose as the main output here.
-    # Also, some SDKs do not work correctly if output tags are modified. So for
-    # external transforms, we leave tags unmodified.
-    #
-    # Python SDK uses 'None' as the tag of the main output.
-    main_output_tag = 'None'
-
-    step.encoding = self._get_encoded_output_coder(
-        transform_node, output_tag=main_output_tag)
-
-    side_output_tags = set(all_output_tags).difference({main_output_tag})
-
-    # Add the main output to the description.
-    outputs.append({
-        PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-        PropertyNames.ENCODING: step.encoding,
-        PropertyNames.OUTPUT_NAME: main_output_tag
-    })
-    for side_tag in side_output_tags:
-      # The assumption here is that all outputs will have the same typehint
-      # and coder as the main output. This is certainly the case right now
-      # but conceivably it could change in the future.
-      encoding = self._get_encoded_output_coder(
-          transform_node, output_tag=side_tag)
-      outputs.append({
-          PropertyNames.USER_NAME: (
-              '%s.%s' % (transform_node.full_label, side_tag)),
-          PropertyNames.ENCODING: encoding,
-          PropertyNames.OUTPUT_NAME: side_tag
-      })
-
-    step.add_property(PropertyNames.OUTPUT_INFO, outputs)
-
-    # Add the restriction encoding if we are a splittable DoFn
-    restriction_coder = transform.get_restriction_coder()
-    if restriction_coder:
-      step.add_property(
-          PropertyNames.RESTRICTION_ENCODING,
-          self._get_cloud_encoding(restriction_coder))
-
-    if options.view_as(StandardOptions).streaming:
-      is_stateful_dofn = (DoFnSignature(transform.dofn).is_stateful_dofn())
-      if is_stateful_dofn:
-        step.add_property(PropertyNames.USES_KEYED_STATE, 'true')
-
-        # Also checks whether the step allows shardable keyed states.
-        # TODO(BEAM-11360): remove this when migrated to portable job
-        #  submission since we only consider supporting the property in runner
-        #  v2.
-        for pcoll in transform_node.outputs.values():
-          if pcoll._unique_name() in self.get_pcoll_with_auto_sharding():
-            step.add_property(PropertyNames.ALLOWS_SHARDABLE_STATE, 'true')
-            # Currently we only allow auto-sharding to be enabled through the
-            # GroupIntoBatches transform. So we also add the following property
-            # which GroupIntoBatchesDoFn has, to allow the backend to perform
-            # graph optimization.
-            step.add_property(PropertyNames.PRESERVES_KEYS, 'true')
-            break
-
-  @staticmethod
-  def _pardo_fn_data(transform_node, get_label):
-    transform = transform_node.transform
-    si_tags_and_types = [  # pylint: disable=protected-access
-        (get_label(side_pval), side_pval.__class__, side_pval._view_options())
-        for side_pval in transform_node.side_inputs]
-    return (
-        transform.fn,
-        transform.args,
-        transform.kwargs,
-        si_tags_and_types,
-        transform_node.inputs[0].windowing)
-
-  def run_CombineValuesReplacement(self, transform_node, options):
-    transform = transform_node.transform.transform
-    input_tag = transform_node.inputs[0].tag
-    input_step = self._cache.get_pvalue(transform_node.inputs[0])
-    step = self._add_step(
-        TransformNames.COMBINE, transform_node.full_label, transform_node)
-    transform_id = self.proto_context.transforms.get_id(transform_node.parent)
-
-    # The data transmitted in SERIALIZED_FN is different depending on whether
-    # this is a runner v2 pipeline or not.
-    if _is_runner_v2(options):
-      # Fnapi pipelines send the transform ID of the CombineValues transform's
-      # parent composite because Dataflow expects the ID of a CombinePerKey
-      # transform.
-      serialized_data = transform_id
-    else:
-      # Combiner functions do not take deferred side-inputs (i.e. PValues) and
-      # therefore the code to handle extra args/kwargs is simpler than for the
-      # DoFn's of the ParDo transform. In the last, empty argument is where
-      # side inputs information would go.
-      serialized_data = pickler.dumps(
-          (transform.fn, transform.args, transform.kwargs, ()))
-    step.add_property(PropertyNames.SERIALIZED_FN, serialized_data)
-    # TODO(BEAM-8882): Enable once dataflow service doesn't reject this.
-    # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID, 
transform_id)
-    step.add_property(
-        PropertyNames.PARALLEL_INPUT,
-        {
-            '@type': 'OutputReference',
-            PropertyNames.STEP_NAME: input_step.proto.name,
-            PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)
-        })
-    # Note that the accumulator must not have a WindowedValue encoding, while
-    # the output of this step does in fact have a WindowedValue encoding.
-    accumulator_encoding = self._get_cloud_encoding(
-        transform.fn.get_accumulator_coder())
-    output_encoding = self._get_encoded_output_coder(transform_node)
-
-    step.encoding = output_encoding
-    step.add_property(PropertyNames.ENCODING, accumulator_encoding)
-    # Generate description for main output 'out.'
-    outputs = []
-    # Add the main output to the description.
-    outputs.append({
-        PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-        PropertyNames.ENCODING: step.encoding,
-        PropertyNames.OUTPUT_NAME: PropertyNames.OUT
-    })
-    step.add_property(PropertyNames.OUTPUT_INFO, outputs)
-
-  def run_Read(self, transform_node, options):
-    transform = transform_node.transform
-    step = self._add_step(
-        TransformNames.READ, transform_node.full_label, transform_node)
-    # TODO(mairbek): refactor if-else tree to use registerable functions.
-    # Initialize the source specific properties.
-
-    standard_options = options.view_as(StandardOptions)
-    if not hasattr(transform.source, 'format'):
-      # If a format is not set, we assume the source to be a custom source.
-      source_dict = {}
-
-      source_dict['spec'] = {
-          '@type': names.SOURCE_TYPE,
-          names.SERIALIZED_SOURCE_KEY: pickler.dumps(transform.source)
-      }
-
-      try:
-        source_dict['metadata'] = {
-            'estimated_size_bytes': json_value.get_typed_value_descriptor(
-                transform.source.estimate_size())
-        }
-      except error.RuntimeValueProviderError:
-        # Size estimation is best effort, and this error is by value provider.
-        _LOGGER.info(
-            'Could not estimate size of source %r due to ' + \
-            'RuntimeValueProviderError', transform.source)
-      except Exception:  # pylint: disable=broad-except
-        # Size estimation is best effort. So we log the error and continue.
-        _LOGGER.info(
-            'Could not estimate size of source %r due to an exception: %s',
-            transform.source,
-            traceback.format_exc())
-
-      step.add_property(PropertyNames.SOURCE_STEP_INPUT, source_dict)
-    elif transform.source.format == 'pubsub':
-      if not standard_options.streaming:
-        raise ValueError(
-            'Cloud Pub/Sub is currently available for use '
-            'only in streaming pipelines.')
-      # Only one of topic or subscription should be set.
-      if transform.source.full_subscription:
-        step.add_property(
-            PropertyNames.PUBSUB_SUBSCRIPTION,
-            transform.source.full_subscription)
-      elif transform.source.full_topic:
-        step.add_property(
-            PropertyNames.PUBSUB_TOPIC, transform.source.full_topic)
-      if transform.source.id_label:
-        step.add_property(
-            PropertyNames.PUBSUB_ID_LABEL, transform.source.id_label)
-      if transform.source.with_attributes:
-        # Setting this property signals Dataflow runner to return full
-        # PubsubMessages instead of just the data part of the payload.
-        step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
-
-      if transform.source.timestamp_attribute is not None:
-        step.add_property(
-            PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
-            transform.source.timestamp_attribute)
-    else:
-      raise ValueError(
-          'Source %r has unexpected format %s.' %
-          (transform.source, transform.source.format))
-
-    if not hasattr(transform.source, 'format'):
-      step.add_property(PropertyNames.FORMAT, names.SOURCE_FORMAT)
-    else:
-      step.add_property(PropertyNames.FORMAT, transform.source.format)
-
-    # Wrap coder in WindowedValueCoder: this is necessary as the encoding of a
-    # step should be the type of value outputted by each step.  Read steps
-    # automatically wrap output values in a WindowedValue wrapper, if 
necessary.
-    # This is also necessary for proper encoding for size estimation.
-    # Using a GlobalWindowCoder as a place holder instead of the default
-    # PickleCoder because GlobalWindowCoder is known coder.
-    # TODO(robertwb): Query the collection for the windowfn to extract the
-    # correct coder.
-    coder = coders.WindowedValueCoder(
-        coders.registry.get_coder(transform_node.outputs[None].element_type),
-        coders.coders.GlobalWindowCoder())
-
-    step.encoding = self._get_cloud_encoding(coder)
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{
-            PropertyNames.USER_NAME: (
-                '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-            PropertyNames.ENCODING: step.encoding,
-            PropertyNames.OUTPUT_NAME: PropertyNames.OUT
-        }])
-
-  def run__NativeWrite(self, transform_node, options):
-    transform = transform_node.transform
-    input_tag = transform_node.inputs[0].tag
-    input_step = self._cache.get_pvalue(transform_node.inputs[0])
-    step = self._add_step(
-        TransformNames.WRITE, transform_node.full_label, transform_node)
-    # TODO(mairbek): refactor if-else tree to use registerable functions.
-    # Initialize the sink specific properties.
-    if transform.sink.format == 'pubsub':
-      standard_options = options.view_as(StandardOptions)
-      if not standard_options.streaming:
-        raise ValueError(
-            'Cloud Pub/Sub is currently available for use '
-            'only in streaming pipelines.')
-      step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.full_topic)
-      if transform.sink.id_label:
-        step.add_property(
-            PropertyNames.PUBSUB_ID_LABEL, transform.sink.id_label)
-      # Setting this property signals Dataflow runner that the PCollection
-      # contains PubsubMessage objects instead of just raw data.
-      step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
-      if transform.sink.timestamp_attribute is not None:
-        step.add_property(
-            PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
-            transform.sink.timestamp_attribute)
-    else:
-      raise ValueError(
-          'Sink %r has unexpected format %s.' %
-          (transform.sink, transform.sink.format))
-    step.add_property(PropertyNames.FORMAT, transform.sink.format)
-
-    # Wrap coder in WindowedValueCoder: this is necessary for proper encoding
-    # for size estimation. Using a GlobalWindowCoder as a place holder instead
-    # of the default PickleCoder because GlobalWindowCoder is known coder.
-    # TODO(robertwb): Query the collection for the windowfn to extract the
-    # correct coder.
-    coder = coders.WindowedValueCoder(
-        transform.sink.coder, coders.coders.GlobalWindowCoder())
-    step.encoding = self._get_cloud_encoding(coder)
-    step.add_property(PropertyNames.ENCODING, step.encoding)
-    step.add_property(
-        PropertyNames.PARALLEL_INPUT,
-        {
-            '@type': 'OutputReference',
-            PropertyNames.STEP_NAME: input_step.proto.name,
-            PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)
-        })
-
-  def run_TestStream(self, transform_node, options):
-    from apache_beam.testing.test_stream import ElementEvent
-    from apache_beam.testing.test_stream import ProcessingTimeEvent
-    from apache_beam.testing.test_stream import WatermarkEvent
-    standard_options = options.view_as(StandardOptions)
-    if not standard_options.streaming:
-      raise ValueError(
-          'TestStream is currently available for use '
-          'only in streaming pipelines.')
-
-    transform = transform_node.transform
-    step = self._add_step(
-        TransformNames.READ, transform_node.full_label, transform_node)
-    step.add_property(
-        PropertyNames.SERIALIZED_FN,
-        self.proto_context.transforms.get_id(transform_node))
-    step.add_property(PropertyNames.FORMAT, 'test_stream')
-    test_stream_payload = beam_runner_api_pb2.TestStreamPayload()
-    # TestStream source doesn't do any decoding of elements,
-    # so we won't set test_stream_payload.coder_id.
-    output_coder = transform._infer_output_coder()  # pylint: 
disable=protected-access
-    for event in transform._events:
-      new_event = test_stream_payload.events.add()
-      if isinstance(event, ElementEvent):
-        for tv in event.timestamped_values:
-          element = new_event.element_event.elements.add()
-          element.encoded_element = output_coder.encode(tv.value)
-          element.timestamp = tv.timestamp.micros
-      elif isinstance(event, ProcessingTimeEvent):
-        new_event.processing_time_event.advance_duration = (
-            event.advance_by.micros)
-      elif isinstance(event, WatermarkEvent):
-        new_event.watermark_event.new_watermark = event.new_watermark.micros
-    serialized_payload = self.byte_array_to_json_string(
-        test_stream_payload.SerializeToString())
-    step.add_property(PropertyNames.SERIALIZED_TEST_STREAM, serialized_payload)
-
-    step.encoding = self._get_encoded_output_coder(transform_node)
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{
-            PropertyNames.USER_NAME: (
-                '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-            PropertyNames.ENCODING: step.encoding,
-            PropertyNames.OUTPUT_NAME: PropertyNames.OUT
-        }])
-
-  # We must mark this method as not a test or else its name is a matcher for
-  # nosetest tests.
-  run_TestStream.__test__ = False  # type: ignore[attr-defined]
-
-  @classmethod
-  def serialize_windowing_strategy(cls, windowing, default_environment):
-    from apache_beam.runners import pipeline_context
-    context = pipeline_context.PipelineContext(
-        default_environment=default_environment)
-    windowing_proto = windowing.to_runner_api(context)
-    return cls.byte_array_to_json_string(
-        beam_runner_api_pb2.MessageWithComponents(
-            components=context.to_runner_api(),
-            windowing_strategy=windowing_proto).SerializeToString())
-
-  @classmethod
-  def deserialize_windowing_strategy(cls, serialized_data):
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam.runners import pipeline_context
-    from apache_beam.transforms.core import Windowing
-    proto = beam_runner_api_pb2.MessageWithComponents()
-    proto.ParseFromString(cls.json_string_to_byte_array(serialized_data))
-    return Windowing.from_runner_api(
-        proto.windowing_strategy,
-        pipeline_context.PipelineContext(proto.components))
-
-  @staticmethod
-  def byte_array_to_json_string(raw_bytes):
-    """Implements 
org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString."""
-    return quote(raw_bytes)
-
-  @staticmethod
-  def json_string_to_byte_array(encoded_string):
-    """Implements 
org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray."""
-    return unquote_to_bytes(encoded_string)
-
   def get_default_gcp_region(self):
     """Get a default value for Google Cloud region according to
     https://cloud.google.com/compute/docs/gcloud-compute/#default-properties.
@@ -1348,6 +568,8 @@ def _check_and_add_missing_options(options):
   options.view_as(
       GoogleCloudOptions).dataflow_service_options = dataflow_service_options
 
+  _add_runner_v2_missing_options(options)
+
   # Ensure that prime is specified as an experiment if specified as a dataflow
   # service option
   if 'enable_prime' in dataflow_service_options:
@@ -1359,11 +581,6 @@ def _check_and_add_missing_options(options):
   # Runner v2 only supports using streaming engine (aka windmill service)
   if options.view_as(StandardOptions).streaming:
     google_cloud_options = options.view_as(GoogleCloudOptions)
-    if _is_runner_v2_disabled(options):
-      raise ValueError(
-          'Disabling Runner V2 no longer supported for streaming pipeline '
-          'using Beam Python %s.' % beam.version.__version__)
-
     if (not google_cloud_options.enable_streaming_engine and
         (debug_options.lookup_experiment("enable_windmill_service") or
          debug_options.lookup_experiment("enable_streaming_engine"))):
@@ -1380,29 +597,6 @@ def _check_and_add_missing_options(options):
     google_cloud_options.enable_streaming_engine = True
     debug_options.add_experiment("enable_streaming_engine")
     debug_options.add_experiment("enable_windmill_service")
-    _add_runner_v2_missing_options(debug_options)
-  elif (debug_options.lookup_experiment('enable_prime') or
-        debug_options.lookup_experiment('beam_fn_api') or
-        debug_options.lookup_experiment('use_unified_worker') or
-        debug_options.lookup_experiment('use_runner_v2') or
-        debug_options.lookup_experiment('use_portable_job_submission')):
-    if _is_runner_v2_disabled(options):
-      raise ValueError(
-          """Runner V2 both disabled and enabled: at least one of
-          ['enable_prime', 'beam_fn_api', 'use_unified_worker', 
'use_runner_v2',
-          'use_portable_job_submission'] is set and also one of
-          ['disable_runner_v2', 'disable_runner_v2_until_2023',
-          'disable_prime_runner_v2'] is set.""")
-    _add_runner_v2_missing_options(debug_options)
-
-
-def _is_runner_v2(options):
-  # Type: (PipelineOptions) -> bool
-
-  """Returns true if runner v2 is enabled."""
-  _check_and_add_missing_options(options)
-  return options.view_as(DebugOptions).lookup_experiment(
-      'use_runner_v2', default=False)
 
 
 def _is_runner_v2_disabled(options):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index b00644d13fd..1e084b98278 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -19,19 +19,13 @@
 
 # pytype: skip-file
 
-import json
 import unittest
-from datetime import datetime
-from itertools import product
 
 import mock
-from parameterized import param
-from parameterized import parameterized
 
 import apache_beam as beam
 import apache_beam.transforms as ptransform
 from apache_beam.options.pipeline_options import DebugOptions
-from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.pipeline import AppliedPTransform
 from apache_beam.pipeline import Pipeline
@@ -45,18 +39,13 @@ from apache_beam.runners import common
 from apache_beam.runners import create_runner
 from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
 from apache_beam.runners.dataflow.dataflow_runner import 
DataflowRuntimeException
-from apache_beam.runners.dataflow.dataflow_runner import PropertyNames
-from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2
-from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled
+from apache_beam.runners.dataflow.dataflow_runner import 
_check_and_add_missing_options
 from apache_beam.runners.dataflow.internal.clients import dataflow as 
dataflow_api
 from apache_beam.runners.runner import PipelineState
 from apache_beam.testing.extra_assertions import ExtraAssertionsMixin
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import combiners
 from apache_beam.transforms import environments
-from apache_beam.transforms import window
-from apache_beam.transforms.core import Windowing
-from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.typehints import typehints
 
 # Protect against environments where apitools library is not available.
@@ -262,49 +251,6 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
           | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
           | ptransform.GroupByKey())
 
-  def test_remote_runner_display_data(self):
-    remote_runner = DataflowRunner()
-    p = Pipeline(
-        remote_runner, options=PipelineOptions(self.default_properties))
-
-    now = datetime.now()
-    # pylint: disable=expression-not-assigned
-    (
-        p | ptransform.Create([1, 2, 3, 4, 5])
-        | 'Do' >> SpecialParDo(SpecialDoFn(), now))
-
-    # TODO(https://github.com/apache/beam/issues/18012) Enable runner API on
-    # this test.
-    p.run(test_runner_api=False)
-    job_dict = json.loads(str(remote_runner.job))
-    steps = [
-        step for step in job_dict['steps']
-        if len(step['properties'].get('display_data', [])) > 0
-    ]
-    step = steps[1]
-    disp_data = step['properties']['display_data']
-    nspace = SpecialParDo.__module__ + '.'
-    expected_data = [{
-        'type': 'TIMESTAMP',
-        'namespace': nspace + 'SpecialParDo',
-        'value': DisplayDataItem._format_value(now, 'TIMESTAMP'),
-        'key': 'a_time'
-    },
-                     {
-                         'type': 'STRING',
-                         'namespace': nspace + 'SpecialParDo',
-                         'value': nspace + 'SpecialParDo',
-                         'key': 'a_class',
-                         'shortValue': 'SpecialParDo'
-                     },
-                     {
-                         'type': 'INTEGER',
-                         'namespace': nspace + 'SpecialDoFn',
-                         'value': 42,
-                         'key': 'dofn_value'
-                     }]
-    self.assertUnhashableCountEqual(disp_data, expected_data)
-
   def test_group_by_key_input_visitor_with_valid_inputs(self):
     p = TestPipeline()
     pcoll1 = PCollection(p)
@@ -391,15 +337,6 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
     self.assertEqual(flat.element_type, none_str_pc.element_type)
     self.assertEqual(flat.element_type, none_int_pc.element_type)
 
-  def test_serialize_windowing_strategy(self):
-    # This just tests the basic path; more complete tests
-    # are in window_test.py.
-    strategy = Windowing(window.FixedWindows(10))
-    self.assertEqual(
-        strategy,
-        DataflowRunner.deserialize_windowing_strategy(
-            DataflowRunner.serialize_windowing_strategy(strategy, None)))
-
   def test_side_input_visitor(self):
     p = TestPipeline()
     pc = p | beam.Create([])
@@ -411,8 +348,7 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
         beam.pvalue.AsSingleton(pc),
         beam.pvalue.AsMultiMap(pc))
     applied_transform = AppliedPTransform(None, transform, "label", {'pc': pc})
-    DataflowRunner.side_input_visitor(
-        is_runner_v2=True).visit_transform(applied_transform)
+    DataflowRunner.side_input_visitor().visit_transform(applied_transform)
     self.assertEqual(2, len(applied_transform.side_inputs))
     self.assertEqual(
         common_urns.side_inputs.ITERABLE.urn,
@@ -504,116 +440,6 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
     result = runner.get_default_gcp_region()
     self.assertIsNone(result)
 
-  def test_combine_values_translation(self):
-    runner = DataflowRunner()
-
-    with beam.Pipeline(runner=runner,
-                       options=PipelineOptions(self.default_properties)) as p:
-      (  # pylint: disable=expression-not-assigned
-          p
-          | beam.Create([('a', [1, 2]), ('b', [3, 4])])
-          | beam.CombineValues(lambda v, _: sum(v)))
-
-    job_dict = json.loads(str(runner.job))
-    self.assertIn(
-        'CombineValues', set(step['kind'] for step in job_dict['steps']))
-
-  def _find_step(self, job, step_name):
-    job_dict = json.loads(str(job))
-    maybe_step = [
-        s for s in job_dict['steps']
-        if s['properties']['user_name'] == step_name
-    ]
-    self.assertTrue(maybe_step, 'Could not find step {}'.format(step_name))
-    return maybe_step[0]
-
-  def expect_correct_override(self, job, step_name, step_kind):
-    """Expects that a transform was correctly overriden."""
-
-    # If the typing information isn't being forwarded correctly, the component
-    # encodings here will be incorrect.
-    expected_output_info = [{
-        "encoding": {
-            "@type": "kind:windowed_value",
-            "component_encodings": [{
-                "@type": "kind:bytes"
-            }, {
-                "@type": "kind:global_window"
-            }],
-            "is_wrapper": True
-        },
-        "output_name": "out",
-        "user_name": step_name + ".out"
-    }]
-
-    step = self._find_step(job, step_name)
-    self.assertEqual(step['kind'], step_kind)
-
-    # The display data here is forwarded because the replace transform is
-    # subclassed from iobase.Read.
-    self.assertGreater(len(step['properties']['display_data']), 0)
-    self.assertEqual(step['properties']['output_info'], expected_output_info)
-
-  def test_read_create_translation(self):
-    runner = DataflowRunner()
-
-    with beam.Pipeline(runner=runner,
-                       options=PipelineOptions(self.default_properties)) as p:
-      # pylint: disable=expression-not-assigned
-      p | beam.Create([b'a', b'b', b'c'])
-
-    self.expect_correct_override(runner.job, 'Create/Read', 'ParallelRead')
-
-  def test_read_pubsub_translation(self):
-    runner = DataflowRunner()
-
-    self.default_properties.append("--streaming")
-
-    with beam.Pipeline(runner=runner,
-                       options=PipelineOptions(self.default_properties)) as p:
-      # pylint: disable=expression-not-assigned
-      p | beam.io.ReadFromPubSub(topic='projects/project/topics/topic')
-
-    self.expect_correct_override(
-        runner.job, 'ReadFromPubSub/Read', 'ParallelRead')
-
-  def test_gbk_translation(self):
-    runner = DataflowRunner()
-    with beam.Pipeline(runner=runner,
-                       options=PipelineOptions(self.default_properties)) as p:
-      # pylint: disable=expression-not-assigned
-      p | beam.Create([(1, 2)]) | beam.GroupByKey()
-
-    expected_output_info = [{
-        "encoding": {
-            "@type": "kind:windowed_value",
-            "component_encodings": [{
-                "@type": "kind:pair",
-                "component_encodings": [{
-                    "@type": "kind:varint"
-                },
-                {
-                    "@type": "kind:stream",
-                    "component_encodings": [{
-                        "@type": "kind:varint"
-                    }],
-                    "is_stream_like": True
-                }],
-                "is_pair_like": True
-            }, {
-                "@type": "kind:global_window"
-            }],
-            "is_wrapper": True
-        },
-        "output_name": "out",
-        "user_name": "GroupByKey.out"
-    }]  # yapf: disable
-
-    gbk_step = self._find_step(runner.job, 'GroupByKey')
-    self.assertEqual(gbk_step['kind'], 'GroupByKey')
-    self.assertEqual(
-        gbk_step['properties']['output_info'], expected_output_info)
-
   @unittest.skip(
       'https://github.com/apache/beam/issues/18716: enable once '
       'CombineFnVisitor is fixed')
@@ -646,43 +472,6 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
     except ValueError:
       self.fail('ValueError raised unexpectedly')
 
-  def _run_group_into_batches_and_get_step_properties(
-      self, with_sharded_key, additional_properties):
-    self.default_properties.append('--streaming')
-    for property in additional_properties:
-      self.default_properties.append(property)
-
-    runner = DataflowRunner()
-    with beam.Pipeline(runner=runner,
-                       options=PipelineOptions(self.default_properties)) as p:
-      # pylint: disable=expression-not-assigned
-      input = p | beam.Create([('a', 1), ('a', 1), ('b', 3), ('b', 4)])
-      if with_sharded_key:
-        (
-            input | beam.GroupIntoBatches.WithShardedKey(2)
-            | beam.Map(lambda key_values: (key_values[0].key, key_values[1])))
-        step_name = (
-            'WithShardedKey/GroupIntoBatches/ParDo(_GroupIntoBatchesDoFn)')
-      else:
-        input | beam.GroupIntoBatches(2)
-        step_name = 'GroupIntoBatches/ParDo(_GroupIntoBatchesDoFn)'
-
-    return self._find_step(runner.job, step_name)['properties']
-
-  def test_group_into_batches_translation(self):
-    properties = self._run_group_into_batches_and_get_step_properties(
-        True, ['--enable_streaming_engine', '--experiments=use_runner_v2'])
-    self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], 'true')
-    self.assertEqual(properties[PropertyNames.ALLOWS_SHARDABLE_STATE], 'true')
-    self.assertEqual(properties[PropertyNames.PRESERVES_KEYS], 'true')
-
-  def test_group_into_batches_translation_non_sharded(self):
-    properties = self._run_group_into_batches_and_get_step_properties(
-        False, ['--enable_streaming_engine', '--experiments=use_runner_v2'])
-    self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], 'true')
-    self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
-    self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)
-
   def test_pack_combiners(self):
     class PackableCombines(beam.PTransform):
       def annotations(self):
@@ -711,141 +500,44 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
     self.assertNotIn(unpacked_maximum_step_name, transform_names)
     self.assertIn(packed_step_name, transform_names)
 
-  @parameterized.expand([
-      param(memory_hint='min_ram'),
-      param(memory_hint='minRam'),
-  ])
-  def test_resource_hints_translation(self, memory_hint):
-    runner = DataflowRunner()
-    self.default_properties.append('--resource_hint=accelerator=some_gpu')
-    self.default_properties.append(f'--resource_hint={memory_hint}=20GB')
-    with beam.Pipeline(runner=runner,
-                       options=PipelineOptions(self.default_properties)) as p:
-      # pylint: disable=expression-not-assigned
-      (
-          p
-          | beam.Create([1])
-          | 'MapWithHints' >> beam.Map(lambda x: x + 1).with_resource_hints(
-              min_ram='10GB',
-              
accelerator='type:nvidia-tesla-k80;count:1;install-nvidia-drivers'
-          ))
-
-    step = self._find_step(runner.job, 'MapWithHints')
-    self.assertEqual(
-        step['properties']['resource_hints'],
-        {
-            'beam:resources:min_ram_bytes:v1': '20000000000',
-            'beam:resources:accelerator:v1': \
-                'type%3Anvidia-tesla-k80%3Bcount%3A1%3Binstall-nvidia-drivers'
-        })
-
-  @parameterized.expand([
-      (
-          "%s_%s" % (enable_option, disable_option),
-          enable_option,
-          disable_option)
-      for (enable_option,
-           disable_option) in product([
-               False,
-               'enable_prime',
-               'beam_fn_api',
-               'use_unified_worker',
-               'use_runner_v2',
-               'use_portable_job_submission'
-           ],
-                                      [
-                                          False,
-                                          'disable_runner_v2',
-                                          'disable_runner_v2_until_2023',
-                                          'disable_prime_runner_v2'
-                                      ])
-  ])
-  def test_batch_is_runner_v2(self, name, enable_option, disable_option):
-    options = PipelineOptions(
-        (['--experiments=%s' % enable_option] if enable_option else []) +
-        (['--experiments=%s' % disable_option] if disable_option else []))
-    if (enable_option and disable_option):
-      with self.assertRaisesRegex(ValueError,
-                                  'Runner V2 both disabled and enabled'):
-        _is_runner_v2(options)
-    elif enable_option:
-      self.assertTrue(_is_runner_v2(options))
-      self.assertFalse(_is_runner_v2_disabled(options))
-      for expected in ['beam_fn_api',
-                       'use_unified_worker',
-                       'use_runner_v2',
-                       'use_portable_job_submission']:
-        self.assertTrue(
-            options.view_as(DebugOptions).lookup_experiment(expected, False))
-      if enable_option == 'enable_prime':
-        self.assertIn(
-            'enable_prime',
-            options.view_as(GoogleCloudOptions).dataflow_service_options)
-    elif disable_option:
-      self.assertFalse(_is_runner_v2(options))
-      self.assertTrue(_is_runner_v2_disabled(options))
-    else:
-      self.assertFalse(_is_runner_v2(options))
-
-  @parameterized.expand([
-      (
-          "%s_%s" % (enable_option, disable_option),
-          enable_option,
-          disable_option)
-      for (enable_option,
-           disable_option) in product([
-               False,
-               'enable_prime',
-               'beam_fn_api',
-               'use_unified_worker',
-               'use_runner_v2',
-               'use_portable_job_submission'
-           ],
-                                      [
-                                          False,
-                                          'disable_runner_v2',
-                                          'disable_runner_v2_until_2023',
-                                          'disable_prime_runner_v2'
-                                      ])
-  ])
-  def test_streaming_is_runner_v2(self, name, enable_option, disable_option):
-    options = PipelineOptions(
-        ['--streaming'] +
-        (['--experiments=%s' % enable_option] if enable_option else []) +
-        (['--experiments=%s' % disable_option] if disable_option else []))
-    if disable_option:
-      with self.assertRaisesRegex(
-          ValueError,
-          'Disabling Runner V2 no longer supported for streaming pipeline'):
-        _is_runner_v2(options)
-    else:
-      self.assertTrue(_is_runner_v2(options))
-      for expected in ['beam_fn_api',
-                       'use_unified_worker',
-                       'use_runner_v2',
-                       'use_portable_job_submission',
-                       'enable_windmill_service',
-                       'enable_streaming_engine']:
-        self.assertTrue(
-            options.view_as(DebugOptions).lookup_experiment(expected, False))
-      if enable_option == 'enable_prime':
-        self.assertIn(
-            'enable_prime',
-            options.view_as(GoogleCloudOptions).dataflow_service_options)
+  def test_batch_is_runner_v2(self):
+    options = PipelineOptions()
+    _check_and_add_missing_options(options)
+    for expected in ['beam_fn_api',
+                     'use_unified_worker',
+                     'use_runner_v2',
+                     'use_portable_job_submission']:
+      self.assertTrue(
+          options.view_as(DebugOptions).lookup_experiment(expected, False),
+          expected)
+
+  def test_streaming_is_runner_v2(self):
+    options = PipelineOptions(['--streaming'])
+    _check_and_add_missing_options(options)
+    for expected in ['beam_fn_api',
+                     'use_unified_worker',
+                     'use_runner_v2',
+                     'use_portable_job_submission',
+                     'enable_windmill_service',
+                     'enable_streaming_engine']:
+      self.assertTrue(
+          options.view_as(DebugOptions).lookup_experiment(expected, False),
+          expected)
 
   def test_dataflow_service_options_enable_prime_sets_runner_v2(self):
     options = PipelineOptions(['--dataflow_service_options=enable_prime'])
-    self.assertTrue(_is_runner_v2(options))
+    _check_and_add_missing_options(options)
     for expected in ['beam_fn_api',
                      'use_unified_worker',
                      'use_runner_v2',
                      'use_portable_job_submission']:
       self.assertTrue(
-          options.view_as(DebugOptions).lookup_experiment(expected, False))
+          options.view_as(DebugOptions).lookup_experiment(expected, False),
+          expected)
 
     options = PipelineOptions(
         ['--streaming', '--dataflow_service_options=enable_prime'])
-    self.assertTrue(_is_runner_v2(options))
+    _check_and_add_missing_options(options)
     for expected in ['beam_fn_api',
                      'use_unified_worker',
                      'use_runner_v2',
@@ -853,7 +545,8 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
                      'enable_windmill_service',
                      'enable_streaming_engine']:
       self.assertTrue(
-          options.view_as(DebugOptions).lookup_experiment(expected, False))
+          options.view_as(DebugOptions).lookup_experiment(expected, False),
+          expected)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index bffcc6d6634..ff1beeab510 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -65,7 +65,6 @@ from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.common import validate_pipeline_graph
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow
-from apache_beam.runners.dataflow.internal.names import PropertyNames
 from apache_beam.runners.internal import names as shared_names
 from apache_beam.runners.portability.stager import Stager
 from apache_beam.transforms import DataflowDistributionCounter
@@ -86,63 +85,6 @@ _LOGGER = logging.getLogger(__name__)
 _PYTHON_VERSIONS_SUPPORTED_BY_DATAFLOW = ['3.8', '3.9', '3.10', '3.11']
 
 
-class Step(object):
-  """Wrapper for a dataflow Step protobuf."""
-  def __init__(self, step_kind, step_name, additional_properties=None):
-    self.step_kind = step_kind
-    self.step_name = step_name
-    self.proto = dataflow.Step(kind=step_kind, name=step_name)
-    self.proto.properties = {}
-    self._additional_properties = []
-
-    if additional_properties is not None:
-      for (n, v, t) in additional_properties:
-        self.add_property(n, v, t)
-
-  def add_property(self, name, value, with_type=False):
-    self._additional_properties.append((name, value, with_type))
-    self.proto.properties.additionalProperties.append(
-        dataflow.Step.PropertiesValue.AdditionalProperty(
-            key=name, value=to_json_value(value, with_type=with_type)))
-
-  def _get_outputs(self):
-    """Returns a list of all output labels for a step."""
-    outputs = []
-    for p in self.proto.properties.additionalProperties:
-      if p.key == PropertyNames.OUTPUT_INFO:
-        for entry in p.value.array_value.entries:
-          for entry_prop in entry.object_value.properties:
-            if entry_prop.key == PropertyNames.OUTPUT_NAME:
-              outputs.append(entry_prop.value.string_value)
-    return outputs
-
-  def __reduce__(self):
-    """Reduce hook for pickling the Step class more easily."""
-    return (Step, (self.step_kind, self.step_name, 
self._additional_properties))
-
-  def get_output(self, tag=None):
-    """Returns name if it is one of the outputs or first output if name is 
None.
-
-    Args:
-      tag: tag of the output as a string or None if we want to get the
-        name of the first output.
-
-    Returns:
-      The name of the output associated with the tag or the first output
-      if tag was None.
-
-    Raises:
-      ValueError: if the tag does not exist within outputs.
-    """
-    outputs = self._get_outputs()
-    if tag is None or len(outputs) == 1:
-      return outputs[0]
-    else:
-      if tag not in outputs:
-        raise ValueError('Cannot find named output: %s in %s.' % (tag, 
outputs))
-      return tag
-
-
 class Environment(object):
   """Wrapper for a dataflow Environment protobuf."""
   def __init__(
@@ -152,7 +94,6 @@ class Environment(object):
       environment_version,
       proto_pipeline_staged_url,
       proto_pipeline=None):
-    from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2
     self.standard_options = options.view_as(StandardOptions)
     self.google_cloud_options = options.view_as(GoogleCloudOptions)
     self.worker_options = options.view_as(WorkerOptions)
@@ -192,10 +133,7 @@ class Environment(object):
     if self.standard_options.streaming:
       job_type = 'FNAPI_STREAMING'
     else:
-      if _is_runner_v2(options):
-        job_type = 'FNAPI_BATCH'
-      else:
-        job_type = 'PYTHON_BATCH'
+      job_type = 'FNAPI_BATCH'
     self.proto.version.additionalProperties.extend([
         dataflow.Environment.VersionValue.AdditionalProperty(
             key='job_type', value=to_json_value(job_type)),
@@ -297,7 +235,7 @@ class Environment(object):
         container_image.capabilities.append(capability)
       pool.sdkHarnessContainerImages.append(container_image)
 
-    if not _is_runner_v2(options) or not pool.sdkHarnessContainerImages:
+    if not pool.sdkHarnessContainerImages:
       pool.workerHarnessContainerImage = (
           get_container_image_from_options(options))
     elif len(pool.sdkHarnessContainerImages) == 1:
@@ -554,11 +492,7 @@ class DataflowApplicationClient(object):
     self._root_staging_location = (
         root_staging_location or self.google_cloud_options.staging_location)
 
-    from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2
-    if _is_runner_v2(options):
-      self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
-    else:
-      self.environment_version = _LEGACY_ENVIRONMENT_MAJOR_VERSION
+    self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
 
     if self.google_cloud_options.no_auth:
       credentials = None
@@ -1202,46 +1136,31 @@ def get_container_image_from_options(pipeline_options):
     Returns:
       str: Container image for remote execution.
   """
-  from apache_beam.runners.dataflow.dataflow_runner import 
_is_runner_v2_disabled
   worker_options = pipeline_options.view_as(WorkerOptions)
   if worker_options.sdk_container_image:
     return worker_options.sdk_container_image
 
-  is_runner_v2 = not _is_runner_v2_disabled(pipeline_options)
-
   # Legacy and runner v2 exist in different repositories.
   # Set to legacy format, override if runner v2
   container_repo = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY
-  image_name = '{repository}/python{major}{minor}'.format(
+  image_name = '{repository}/beam_python{major}.{minor}_sdk'.format(
       repository=container_repo,
       major=sys.version_info[0],
       minor=sys.version_info[1])
 
-  if is_runner_v2:
-    image_name = '{repository}/beam_python{major}.{minor}_sdk'.format(
-        repository=container_repo,
-        major=sys.version_info[0],
-        minor=sys.version_info[1])
-
-  image_tag = _get_required_container_version(is_runner_v2)
+  image_tag = _get_required_container_version()
   return image_name + ':' + image_tag
 
 
-def _get_required_container_version(is_runner_v2):
+def _get_required_container_version():
   """For internal use only; no backwards-compatibility guarantees.
 
-    Args:
-      is_runner_v2 (bool): True if and only if pipeline is using runner v2.
-
     Returns:
       str: The tag of worker container images in GCR that corresponds to
         current version of the SDK.
     """
   if 'dev' in beam_version.__version__:
-    if is_runner_v2:
-      return names.BEAM_FNAPI_CONTAINER_VERSION
-    else:
-      return names.BEAM_CONTAINER_VERSION
+    return names.BEAM_FNAPI_CONTAINER_VERSION
   else:
     return _get_container_image_tag()
 
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 89ac58a727b..22e779a8c27 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -653,25 +653,6 @@ class UtilTest(unittest.TestCase):
                 sys.version_info[1],
                 names.BEAM_FNAPI_CONTAINER_VERSION)))
 
-    # batch, legacy pipeline.
-    pipeline_options = pipeline_options = PipelineOptions([
-        '--temp_location',
-        'gs://any-location/temp',
-        '--experiments=disable_runner_v2_until_v2.50'
-    ])
-    env = apiclient.Environment(
-        [],  #packages
-        pipeline_options,
-        '2.0.0',  #any environment version
-        FAKE_PIPELINE_URL)
-    self.assertEqual(
-        env.proto.workerPools[0].workerHarnessContainerImage,
-        (
-            names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d:%s' % (
-                sys.version_info[0],
-                sys.version_info[1],
-                names.BEAM_CONTAINER_VERSION)))
-
   @mock.patch(
       'apache_beam.runners.dataflow.internal.apiclient.'
       'beam_version.__version__',
@@ -706,23 +687,6 @@ class UtilTest(unittest.TestCase):
             '/beam_python%d.%d_sdk:2.2.0' %
             (sys.version_info[0], sys.version_info[1])))
 
-    # batch, legacy pipeline.
-    pipeline_options = pipeline_options = PipelineOptions([
-        '--temp_location',
-        'gs://any-location/temp',
-        '--experiments=disable_runner_v2_until_v2.50'
-    ])
-    env = apiclient.Environment(
-        [],  #packages
-        pipeline_options,
-        '2.0.0',  #any environment version
-        FAKE_PIPELINE_URL)
-    self.assertEqual(
-        env.proto.workerPools[0].workerHarnessContainerImage,
-        (
-            names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d:2.2.0' %
-            (sys.version_info[0], sys.version_info[1])))
-
   @mock.patch(
       'apache_beam.runners.dataflow.internal.apiclient.'
       'beam_version.__version__',
@@ -757,23 +721,6 @@ class UtilTest(unittest.TestCase):
             '/beam_python%d.%d_sdk:2.2.0' %
             (sys.version_info[0], sys.version_info[1])))
 
-    # batch, legacy pipeline
-    pipeline_options = pipeline_options = PipelineOptions([
-        '--temp_location',
-        'gs://any-location/temp',
-        '--experiments=disable_runner_v2_until_v2.50'
-    ])
-    env = apiclient.Environment(
-        [],  #packages
-        pipeline_options,
-        '2.0.0',  #any environment version
-        FAKE_PIPELINE_URL)
-    self.assertEqual(
-        env.proto.workerPools[0].workerHarnessContainerImage,
-        (
-            names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d:2.2.0' %
-            (sys.version_info[0], sys.version_info[1])))
-
   def test_worker_harness_override_takes_precedence_over_sdk_defaults(self):
     # streaming, fnapi pipeline.
     pipeline_options = PipelineOptions([
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py 
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index c0444037de8..f86306eb276 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -42,79 +42,3 @@ BEAM_CONTAINER_VERSION = 'beam-master-20230629'
 BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20230705'
 
 DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3'
-
-
-class TransformNames(object):
-  """For internal use only; no backwards-compatibility guarantees.
-
-  Transform strings as they are expected in the CloudWorkflow protos.
-  """
-  COLLECTION_TO_SINGLETON = 'CollectionToSingleton'
-  COMBINE = 'CombineValues'
-  CREATE_PCOLLECTION = 'CreateCollection'
-  DO = 'ParallelDo'
-  FLATTEN = 'Flatten'
-  GROUP = 'GroupByKey'
-  READ = 'ParallelRead'
-  WRITE = 'ParallelWrite'
-
-
-class PropertyNames(object):
-  """For internal use only; no backwards-compatibility guarantees.
-
-  Property strings as they are expected in the CloudWorkflow protos.
-  """
-  # If uses_keyed_state, whether the state can be sharded.
-  ALLOWS_SHARDABLE_STATE = 'allows_shardable_state'
-  BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
-  BIGQUERY_DATASET = 'dataset'
-  BIGQUERY_EXPORT_FORMAT = 'bigquery_export_format'
-  BIGQUERY_FLATTEN_RESULTS = 'bigquery_flatten_results'
-  BIGQUERY_KMS_KEY = 'bigquery_kms_key'
-  BIGQUERY_PROJECT = 'project'
-  BIGQUERY_QUERY = 'bigquery_query'
-  BIGQUERY_SCHEMA = 'schema'
-  BIGQUERY_TABLE = 'table'
-  BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql'
-  BIGQUERY_WRITE_DISPOSITION = 'write_disposition'
-  DISPLAY_DATA = 'display_data'
-  ELEMENT = 'element'
-  ELEMENTS = 'elements'
-  ENCODING = 'encoding'
-  FILE_PATTERN = 'filepattern'
-  FILE_NAME_PREFIX = 'filename_prefix'
-  FILE_NAME_SUFFIX = 'filename_suffix'
-  FORMAT = 'format'
-  INPUTS = 'inputs'
-  IMPULSE_ELEMENT = 'impulse_element'
-  NON_PARALLEL_INPUTS = 'non_parallel_inputs'
-  NUM_SHARDS = 'num_shards'
-  OUT = 'out'
-  OUTPUT = 'output'
-  OUTPUT_INFO = 'output_info'
-  OUTPUT_NAME = 'output_name'
-  PARALLEL_INPUT = 'parallel_input'
-  PIPELINE_PROTO_TRANSFORM_ID = 'pipeline_proto_transform_id'
-  # If the input element is a key/value pair, then the output element(s) all
-  # have the same key as the input.
-  PRESERVES_KEYS = 'preserves_keys'
-  PUBSUB_ID_LABEL = 'pubsub_id_label'
-  PUBSUB_SERIALIZED_ATTRIBUTES_FN = 'pubsub_serialized_attributes_fn'
-  PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
-  PUBSUB_TIMESTAMP_ATTRIBUTE = 'pubsub_timestamp_label'
-  PUBSUB_TOPIC = 'pubsub_topic'
-  RESOURCE_HINTS = 'resource_hints'
-  RESTRICTION_ENCODING = 'restriction_encoding'
-  SERIALIZED_FN = 'serialized_fn'
-  SHARD_NAME_TEMPLATE = 'shard_template'
-  SOURCE_STEP_INPUT = 'custom_source_step_input'
-  SERIALIZED_TEST_STREAM = 'serialized_test_stream'
-  STEP_NAME = 'step_name'
-  USE_INDEXED_FORMAT = 'use_indexed_format'
-  USER_FN = 'user_fn'
-  USER_NAME = 'user_name'
-  USES_KEYED_STATE = 'uses_keyed_state'
-  VALIDATE_SINK = 'validate_sink'
-  VALIDATE_SOURCE = 'validate_source'
-  VALUE = 'value'
-  WINDOWING_STRATEGY = 'windowing_strategy'
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py 
b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 1012a7d3624..8004762f5ee 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -19,101 +19,9 @@
 
 # pytype: skip-file
 
-from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.pipeline import PTransformOverride
 
 
-class CreatePTransformOverride(PTransformOverride):
-  """A ``PTransformOverride`` for ``Create`` in streaming mode."""
-  def matches(self, applied_ptransform):
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam import Create
-    return isinstance(applied_ptransform.transform, Create)
-
-  def get_replacement_transform_for_applied_ptransform(
-      self, applied_ptransform):
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam import PTransform
-
-    ptransform = applied_ptransform.transform
-
-    # Return a wrapper rather than ptransform.as_read() directly to
-    # ensure backwards compatibility of the pipeline structure.
-    class LegacyCreate(PTransform):
-      def expand(self, pbegin):
-        return pbegin | ptransform.as_read()
-
-    return LegacyCreate().with_output_types(ptransform.get_output_type())
-
-
-class ReadPTransformOverride(PTransformOverride):
-  """A ``PTransformOverride`` for ``Read(BoundedSource)``"""
-  def matches(self, applied_ptransform):
-    from apache_beam.io import Read
-    from apache_beam.io.iobase import BoundedSource
-    # Only overrides Read(BoundedSource) transform
-    if (isinstance(applied_ptransform.transform, Read) and
-        not getattr(applied_ptransform.transform, 'override', False)):
-      if isinstance(applied_ptransform.transform.source, BoundedSource):
-        return True
-    return False
-
-  def get_replacement_transform_for_applied_ptransform(
-      self, applied_ptransform):
-
-    from apache_beam import pvalue
-    from apache_beam.io import iobase
-
-    transform = applied_ptransform.transform
-
-    class Read(iobase.Read):
-      override = True
-
-      def expand(self, pbegin):
-        return pvalue.PCollection(
-            self.pipeline, is_bounded=self.source.is_bounded())
-
-    return Read(transform.source).with_output_types(
-        transform.get_type_hints().simple_output_type('Read'))
-
-
-class CombineValuesPTransformOverride(PTransformOverride):
-  """A ``PTransformOverride`` for ``CombineValues``.
-
-  The DataflowRunner expects that the CombineValues PTransform acts as a
-  primitive. So this override replaces the CombineValues with a primitive.
-  """
-  def matches(self, applied_ptransform):
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam import CombineValues
-
-    if isinstance(applied_ptransform.transform, CombineValues):
-      self.transform = applied_ptransform.transform
-      return True
-    return False
-
-  def get_replacement_transform(self, ptransform):
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam import PTransform
-    from apache_beam.pvalue import PCollection
-
-    # The DataflowRunner still needs access to the CombineValues members to
-    # generate a V1B3 proto representation, so we remember the transform from
-    # the matches method and forward it here.
-    class CombineValuesReplacement(PTransform):
-      def __init__(self, transform):
-        self.transform = transform
-
-      def expand(self, pcoll):
-        return PCollection.from_(pcoll)
-
-    return CombineValuesReplacement(self.transform)
-
-
 class NativeReadPTransformOverride(PTransformOverride):
   """A ``PTransformOverride`` for ``Read`` using native sources.
 
@@ -150,37 +58,3 @@ class NativeReadPTransformOverride(PTransformOverride):
     # will choose the incorrect coder for this transform.
     return Read(ptransform.source).with_output_types(
         ptransform.source.coder.to_type_hint())
-
-
-class GroupIntoBatchesWithShardedKeyPTransformOverride(PTransformOverride):
-  """A ``PTransformOverride`` for ``GroupIntoBatches.WithShardedKey``.
-
-  This override simply returns the original transform but additionally records
-  the output PCollection in order to append required step properties during
-  graph translation.
-  """
-  def __init__(self, dataflow_runner, options):
-    self.dataflow_runner = dataflow_runner
-    self.options = options
-
-  def matches(self, applied_ptransform):
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam import util
-
-    transform = applied_ptransform.transform
-
-    if not isinstance(transform, util.GroupIntoBatches.WithShardedKey):
-      return False
-
-    # The replacement is only valid for portable Streaming Engine jobs with
-    # runner v2.
-    standard_options = self.options.view_as(StandardOptions)
-    if not standard_options.streaming:
-      return False
-
-    self.dataflow_runner.add_pcoll_with_auto_sharding(applied_ptransform)
-    return True
-
-  def get_replacement_transform_for_applied_ptransform(self, ptransform):
-    return ptransform.transform


Reply via email to