This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eaa869  [BEAM-7372][BEAM-9372] Cleanup py2 and py35 codepath from 
runners worker (#14730)
1eaa869 is described below

commit 1eaa8694528f44c8254992ad30da98aa8fb999cf
Author: yoshiki.obata <[email protected]>
AuthorDate: Wed May 19 06:28:09 2021 +0900

    [BEAM-7372][BEAM-9372] Cleanup py2 and py35 codepath from runners worker 
(#14730)
---
 sdks/python/apache_beam/runners/__init__.py        |   2 -
 sdks/python/apache_beam/runners/common.py          |  19 +---
 sdks/python/apache_beam/runners/common_test.py     |   2 -
 .../python/apache_beam/runners/pipeline_context.py |   3 -
 .../apache_beam/runners/pipeline_context_test.py   |   2 -
 .../portability/fn_api_runner/worker_handlers.py   |  10 +-
 sdks/python/apache_beam/runners/runner.py          |   3 -
 sdks/python/apache_beam/runners/runner_test.py     |   2 -
 sdks/python/apache_beam/runners/sdf_utils.py       |   4 -
 sdks/python/apache_beam/runners/sdf_utils_test.py  |   2 -
 sdks/python/apache_beam/runners/worker/__init__.py |   1 -
 .../apache_beam/runners/worker/bundle_processor.py |  27 ++----
 .../runners/worker/bundle_processor_test.py        |   2 -
 .../apache_beam/runners/worker/channel_factory.py  |   4 -
 .../apache_beam/runners/worker/data_plane.py       |  44 ++++-----
 .../apache_beam/runners/worker/data_plane_test.py  |   4 -
 .../apache_beam/runners/worker/log_handler.py      |   3 -
 .../apache_beam/runners/worker/log_handler_test.py |   3 -
 sdks/python/apache_beam/runners/worker/logger.py   |   2 -
 .../apache_beam/runners/worker/logger_test.py      |   4 -
 .../apache_beam/runners/worker/opcounters.py       |  11 +--
 .../apache_beam/runners/worker/opcounters_test.py  |   5 -
 .../apache_beam/runners/worker/operation_specs.py  |   2 -
 .../apache_beam/runners/worker/operations.py       |   5 -
 .../apache_beam/runners/worker/sdk_worker.py       | 102 ++++++++++++++++-----
 .../apache_beam/runners/worker/sdk_worker_main.py  |   3 -
 .../runners/worker/sdk_worker_main_test.py         |   4 -
 .../apache_beam/runners/worker/sdk_worker_test.py  |  21 ++---
 .../apache_beam/runners/worker/sideinputs.py       |   4 -
 .../apache_beam/runners/worker/sideinputs_test.py  |   4 -
 .../apache_beam/runners/worker/statecache.py       |   2 -
 .../apache_beam/runners/worker/statecache_test.py  |   2 -
 .../apache_beam/runners/worker/statesampler.py     |   2 -
 .../runners/worker/statesampler_slow.py            |   3 -
 .../runners/worker/statesampler_test.py            |   4 -
 .../runners/worker/worker_id_interceptor.py        |   4 -
 .../runners/worker/worker_id_interceptor_test.py   |   2 -
 .../apache_beam/runners/worker/worker_pool_main.py |   2 -
 .../apache_beam/runners/worker/worker_status.py    |   3 -
 .../runners/worker/worker_status_test.py           |   2 -
 40 files changed, 131 insertions(+), 199 deletions(-)

diff --git a/sdks/python/apache_beam/runners/__init__.py 
b/sdks/python/apache_beam/runners/__init__.py
index 0f278d1..f92d95a 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -20,8 +20,6 @@
 This package defines runners, which are used to execute a pipeline.
 """
 
-from __future__ import absolute_import
-
 from apache_beam.runners.direct.direct_runner import DirectRunner
 from apache_beam.runners.direct.test_direct_runner import TestDirectRunner
 from apache_beam.runners.runner import PipelineRunner
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 7703ae7..7b73326 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -23,16 +23,9 @@ For internal use only; no backwards-compatibility guarantees.
 """
 
 # pytype: skip-file
-
-from __future__ import absolute_import
-from __future__ import division
-
+import sys
 import threading
 import traceback
-from builtins import next
-from builtins import object
-from builtins import round
-from builtins import zip
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import Dict
@@ -42,9 +35,6 @@ from typing import Mapping
 from typing import Optional
 from typing import Tuple
 
-from future.utils import raise_with_traceback
-from past.builtins import unicode
-
 from apache_beam.coders import TupleCoder
 from apache_beam.internal import util
 from apache_beam.options.value_provider import RuntimeValueProvider
@@ -1312,7 +1302,8 @@ class DoFnRunner:
           traceback.format_exception_only(type(exn), exn)[-1].strip() +
           step_annotation)
       new_exn._tagged_with_step = True
-    raise_with_traceback(new_exn)
+    _, _, tb = sys.exc_info()
+    raise new_exn.with_traceback(tb)
 
 
 class OutputProcessor(object):
@@ -1371,7 +1362,7 @@ class _OutputProcessor(OutputProcessor):
       tag = None
       if isinstance(result, TaggedOutput):
         tag = result.tag
-        if not isinstance(tag, (str, unicode)):
+        if not isinstance(tag, str):
           raise TypeError('In %s, tag %s is not a string' % (self, tag))
         result = result.value
       if isinstance(result, WindowedValue):
@@ -1421,7 +1412,7 @@ class _OutputProcessor(OutputProcessor):
       tag = None
       if isinstance(result, TaggedOutput):
         tag = result.tag
-        if not isinstance(tag, (str, unicode)):
+        if not isinstance(tag, str):
           raise TypeError('In %s, tag %s is not a string' % (self, tag))
         result = result.value
 
diff --git a/sdks/python/apache_beam/runners/common_test.py 
b/sdks/python/apache_beam/runners/common_test.py
index c9860e6..c089d5a 100644
--- a/sdks/python/apache_beam/runners/common_test.py
+++ b/sdks/python/apache_beam/runners/common_test.py
@@ -17,8 +17,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import unittest
 
 import hamcrest as hc
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py 
b/sdks/python/apache_beam/runners/pipeline_context.py
index be62545..7cefe39 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -23,9 +23,6 @@ For internal use only; no backwards-compatibility guarantees.
 # pytype: skip-file
 # mypy: disallow-untyped-defs
 
-from __future__ import absolute_import
-
-from builtins import object
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import Dict
diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py 
b/sdks/python/apache_beam/runners/pipeline_context_test.py
index 5e9334b..49ff6f7 100644
--- a/sdks/python/apache_beam/runners/pipeline_context_test.py
+++ b/sdks/python/apache_beam/runners/pipeline_context_test.py
@@ -19,8 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import unittest
 
 from apache_beam import coders
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index c63e5f0..b967e01 100644
--- 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -316,7 +316,7 @@ class WorkerHandler(object):
       urn,  # type: str
       payload_type  # type: Optional[Type[T]]
   ):
-    # type: (...) -> Callable[[Callable[[T, sdk_worker.StateHandler, 
ExtendedProvisionInfo, GrpcServer], WorkerHandler]], Callable[[T, 
sdk_worker.StateHandler, ExtendedProvisionInfo, GrpcServer], WorkerHandler]]
+    # type: (...) -> Callable[[Type[WorkerHandler]], Callable[[T, 
sdk_worker.StateHandler, ExtendedProvisionInfo, GrpcServer], WorkerHandler]]
     def wrapper(constructor):
       # type: (Callable) -> Callable
       cls._registered_environments[urn] = constructor, payload_type  # type: 
ignore[assignment]
@@ -343,7 +343,7 @@ class WorkerHandler(object):
 # This takes a WorkerHandlerManager instead of GrpcServer, so it is not
 # compatible with WorkerHandler.register_environment.  There is a special case
 # in WorkerHandlerManager.get_worker_handlers() that allows it to work.
[email protected]_environment(python_urns.EMBEDDED_PYTHON, None)  # 
type: ignore[arg-type]
[email protected]_environment(python_urns.EMBEDDED_PYTHON, None)
 class EmbeddedWorkerHandler(WorkerHandler):
   """An in-memory worker_handler for fn API control, state and data planes."""
 
@@ -361,7 +361,7 @@ class EmbeddedWorkerHandler(WorkerHandler):
     state_cache = StateCache(STATE_CACHE_SIZE)
     self.bundle_processor_cache = sdk_worker.BundleProcessorCache(
         SingletonStateHandlerFactory(
-            sdk_worker.CachingStateHandler(state_cache, state)),
+            sdk_worker.GlobalCachingStateHandler(state_cache, state)),
         data_plane.InMemoryDataChannelFactory(
             self.data_plane_handler.inverse()),
         worker_manager._process_bundle_descriptors)
@@ -1053,6 +1053,10 @@ class 
StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer,
         pass
     return _Future.done()
 
+  def done(self):
+    # type: () -> None
+    pass
+
   @staticmethod
   def _to_key(state_key):
     # type: (beam_fn_api_pb2.StateKey) -> bytes
diff --git a/sdks/python/apache_beam/runners/runner.py 
b/sdks/python/apache_beam/runners/runner.py
index 02ed845..1030a53 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -19,15 +19,12 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import importlib
 import logging
 import os
 import shelve
 import shutil
 import tempfile
-from builtins import object
 from typing import TYPE_CHECKING
 from typing import Optional
 
diff --git a/sdks/python/apache_beam/runners/runner_test.py 
b/sdks/python/apache_beam/runners/runner_test.py
index 0923369..61fe400 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -24,8 +24,6 @@ caching and clearing values that are not tested elsewhere.
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import unittest
 
 import apache_beam as beam
diff --git a/sdks/python/apache_beam/runners/sdf_utils.py 
b/sdks/python/apache_beam/runners/sdf_utils.py
index aa91bda..8ff9a35 100644
--- a/sdks/python/apache_beam/runners/sdf_utils.py
+++ b/sdks/python/apache_beam/runners/sdf_utils.py
@@ -19,12 +19,8 @@
 
 """Common utility class to help SDK harness to execute an SDF. """
 
-from __future__ import absolute_import
-from __future__ import division
-
 import logging
 import threading
-from builtins import object
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import NamedTuple
diff --git a/sdks/python/apache_beam/runners/sdf_utils_test.py 
b/sdks/python/apache_beam/runners/sdf_utils_test.py
index 0870680..a4510d7 100644
--- a/sdks/python/apache_beam/runners/sdf_utils_test.py
+++ b/sdks/python/apache_beam/runners/sdf_utils_test.py
@@ -19,8 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import time
 import unittest
 
diff --git a/sdks/python/apache_beam/runners/worker/__init__.py 
b/sdks/python/apache_beam/runners/worker/__init__.py
index 9fbf215..0bce5d6 100644
--- a/sdks/python/apache_beam/runners/worker/__init__.py
+++ b/sdks/python/apache_beam/runners/worker/__init__.py
@@ -16,4 +16,3 @@
 #
 
 """For internal use only; no backwards-compatibility guarantees."""
-from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index f05228e..d4c066b 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -19,10 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
 import base64
 import bisect
 import collections
@@ -31,8 +27,6 @@ import json
 import logging
 import random
 import threading
-from builtins import next
-from builtins import object
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import Callable
@@ -52,7 +46,6 @@ from typing import TypeVar
 from typing import Union
 from typing import cast
 
-from future.utils import itervalues
 from google.protobuf import duration_pb2
 from google.protobuf import timestamp_pb2
 
@@ -174,7 +167,7 @@ class DataInputOperation(RunnerIOOperation):
   def __init__(self,
                operation_name,  # type: Union[str, common.NameContext]
                step_name,
-               consumers,  # type: Mapping[Any, Iterable[operations.Operation]]
+               consumers,  # type: Mapping[Any, List[operations.Operation]]
                counter_factory,  # type: counters.CounterFactory
                state_sampler,  # type: statesampler.StateSampler
                windowed_coder,  # type: coders.Coder
@@ -197,7 +190,7 @@ class DataInputOperation(RunnerIOOperation):
             self.counter_factory,
             self.name_context.step_name,
             0,
-            next(iter(itervalues(consumers))),
+            next(iter(consumers.values())),
             self.windowed_coder,
             self._get_runtime_performance_hints())
     ]
@@ -963,7 +956,7 @@ class BundleProcessor(object):
       # (transform_id, timer_family_id).
       data_channels = collections.defaultdict(
           list
-      )  # type: DefaultDict[data_plane.GrpcClientDataChannel, List[Union[str, 
Tuple[str, str]]]]
+      )  # type: DefaultDict[data_plane.DataChannel, List[Union[str, 
Tuple[str, str]]]]
 
       # Add expected data inputs for each data channel.
       input_op_by_transform_id = {}
@@ -976,13 +969,13 @@ class BundleProcessor(object):
         data_channels[self.timer_data_channel].extend(
             list(self.timers_info.keys()))
 
-      # Set up timer output stream for DoOperation.
-      for ((transform_id, timer_family_id),
-           timer_info) in self.timers_info.items():
-        output_stream = self.timer_data_channel.output_timer_stream(
-            instruction_id, transform_id, timer_family_id)
-        timer_info.output_stream = output_stream
-        self.ops[transform_id].add_timer_info(timer_family_id, timer_info)
+        # Set up timer output stream for DoOperation.
+        for ((transform_id, timer_family_id),
+             timer_info) in self.timers_info.items():
+          output_stream = self.timer_data_channel.output_timer_stream(
+              instruction_id, transform_id, timer_family_id)
+          timer_info.output_stream = output_stream
+          self.ops[transform_id].add_timer_info(timer_family_id, timer_info)
 
       # Process data and timer inputs
       for data_channel, expected_inputs in data_channels.items():
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
index 4a3ce44..87802bc 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
@@ -18,8 +18,6 @@
 """Unit tests for bundle processing."""
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import unittest
 
 from apache_beam.runners.worker.bundle_processor import DataInputOperation
diff --git a/sdks/python/apache_beam/runners/worker/channel_factory.py 
b/sdks/python/apache_beam/runners/worker/channel_factory.py
index b3f31b1..cd859e6 100644
--- a/sdks/python/apache_beam/runners/worker/channel_factory.py
+++ b/sdks/python/apache_beam/runners/worker/channel_factory.py
@@ -18,10 +18,6 @@
 """Factory to create grpc channel."""
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
 import grpc
 
 
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py 
b/sdks/python/apache_beam/runners/worker/data_plane.py
index c106758..ffe48d1 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -20,10 +20,6 @@
 # pytype: skip-file
 # mypy: disallow-untyped-defs
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
 import abc
 import collections
 import logging
@@ -31,11 +27,11 @@ import queue
 import sys
 import threading
 import time
-from builtins import object
-from builtins import range
+from types import TracebackType
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import Callable
+from typing import Collection
 from typing import DefaultDict
 from typing import Dict
 from typing import Iterable
@@ -49,8 +45,6 @@ from typing import Type
 from typing import Union
 
 import grpc
-from future.utils import raise_
-from future.utils import with_metaclass
 
 from apache_beam.coders import coder_impl
 from apache_beam.portability.api import beam_fn_api_pb2
@@ -59,21 +53,16 @@ from apache_beam.runners.worker.channel_factory import 
GRPCChannelFactory
 from apache_beam.runners.worker.worker_id_interceptor import 
WorkerIdInterceptor
 
 if TYPE_CHECKING:
-  # TODO(BEAM-9372): move this out of the TYPE_CHECKING scope when we drop
-  #  support for python < 3.5.3
-  from types import TracebackType
-  ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
-  OptExcInfo = Union[ExcInfo, Tuple[None, None, None]]
-  # TODO: move this out of the TYPE_CHECKING scope when we drop support for
-  #  python < 3.6
-  from typing import Collection  # pylint: disable=ungrouped-imports
   import apache_beam.coders.slow_stream
   OutputStream = apache_beam.coders.slow_stream.OutputStream
-  DataOrTimers = \
-    Union[beam_fn_api_pb2.Elements.Data, beam_fn_api_pb2.Elements.Timers]
+  DataOrTimers = Union[beam_fn_api_pb2.Elements.Data,
+                       beam_fn_api_pb2.Elements.Timers]
 else:
   OutputStream = type(coder_impl.create_OutputStream())
 
+ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
+OptExcInfo = Union[ExcInfo, Tuple[None, None, None]]
+
 # This module is experimental. No backwards-compatibility guarantees.
 
 _LOGGER = logging.getLogger(__name__)
@@ -226,7 +215,7 @@ class PeriodicThread(threading.Thread):
     self._finished.set()
 
 
-class DataChannel(with_metaclass(abc.ABCMeta, object)):  # type: ignore[misc]
+class DataChannel(metaclass=abc.ABCMeta):
   """Represents a channel for reading and writing data over the data plane.
 
   Read data and timer from this channel with the input_elements method::
@@ -424,7 +413,7 @@ class _GrpcDataChannel(DataChannel):
     self._receive_lock = threading.Lock()
     self._reads_finished = threading.Event()
     self._closed = False
-    self._exc_info = None  # type: Optional[OptExcInfo]
+    self._exc_info = (None, None, None)  # type: OptExcInfo
 
   def close(self):
     # type: () -> None
@@ -473,9 +462,9 @@ class _GrpcDataChannel(DataChannel):
             raise RuntimeError('Channel closed prematurely.')
           if abort_callback():
             return
-          if self._exc_info:
-            t, v, tb = self._exc_info
-            raise_(t, v, tb)
+          t, v, tb = self._exc_info
+          if t:
+            raise t(v).with_traceback(tb)
         else:
           if isinstance(element, beam_fn_api_pb2.Elements.Timers):
             if element.is_last:
@@ -641,7 +630,7 @@ class 
BeamFnDataServicer(beam_fn_api_pb2_grpc.BeamFnDataServicer):
       yield elements
 
 
-class DataChannelFactory(with_metaclass(abc.ABCMeta, object)):  # type: 
ignore[misc]
+class DataChannelFactory(metaclass=abc.ABCMeta):
   """An abstract factory for creating ``DataChannel``."""
   @abc.abstractmethod
   def create_data_channel(self, remote_grpc_port):
@@ -651,6 +640,13 @@ class DataChannelFactory(with_metaclass(abc.ABCMeta, 
object)):  # type: ignore[m
     raise NotImplementedError(type(self))
 
   @abc.abstractmethod
+  def create_data_channel_from_url(self, url):
+    # type: (str) -> Optional[GrpcClientDataChannel]
+
+    """Returns a ``DataChannel`` from the given url."""
+    raise NotImplementedError(type(self))
+
+  @abc.abstractmethod
   def close(self):
     # type: () -> None
 
diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py 
b/sdks/python/apache_beam/runners/worker/data_plane_test.py
index 1a83fe1..5124bb6 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane_test.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py
@@ -19,10 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
 import itertools
 import logging
 import time
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py 
b/sdks/python/apache_beam/runners/worker/log_handler.py
index ae01ff3..46157db 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -20,9 +20,6 @@
 # pytype: skip-file
 # mypy: disallow-untyped-defs
 
-from __future__ import absolute_import
-from __future__ import print_function
-
 import logging
 import math
 import queue
diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py 
b/sdks/python/apache_beam/runners/worker/log_handler_test.py
index dcae3f6..a045c0c 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler_test.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py
@@ -17,12 +17,9 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import logging
 import re
 import unittest
-from builtins import range
 
 import grpc
 
diff --git a/sdks/python/apache_beam/runners/worker/logger.py 
b/sdks/python/apache_beam/runners/worker/logger.py
index f650123..e171caf 100644
--- a/sdks/python/apache_beam/runners/worker/logger.py
+++ b/sdks/python/apache_beam/runners/worker/logger.py
@@ -22,8 +22,6 @@
 # pytype: skip-file
 # mypy: disallow-untyped-defs
 
-from __future__ import absolute_import
-
 import contextlib
 import json
 import logging
diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py 
b/sdks/python/apache_beam/runners/worker/logger_test.py
index 47a5ab5..158fd3b 100644
--- a/sdks/python/apache_beam/runners/worker/logger_test.py
+++ b/sdks/python/apache_beam/runners/worker/logger_test.py
@@ -19,15 +19,11 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import unicode_literals
-
 import json
 import logging
 import sys
 import threading
 import unittest
-from builtins import object
 
 from apache_beam.runners.worker import logger
 from apache_beam.runners.worker import statesampler
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py 
b/sdks/python/apache_beam/runners/worker/opcounters.py
index d761be4..bafbf9f 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -22,19 +22,13 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import division
-
 import math
 import random
-from builtins import hex
-from builtins import object
+import sys
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import Optional
 
-from future.utils import raise_with_traceback
-
 from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints.decorators import _check_instance_type
 from apache_beam.utils import counters
@@ -245,7 +239,8 @@ class OperationCounters(object):
         error_msg = (
             'Runtime type violation detected within %s: '
             '%s' % (transform_label, e))
-        raise_with_traceback(TypeCheckError(error_msg))
+        _, _, traceback = sys.exc_info()
+        raise TypeCheckError(error_msg).with_traceback(traceback)
 
   def do_sample(self, windowed_value):
     # type: (windowed_value.WindowedValue) -> None
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py 
b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index d3ab22d..75842fc 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -17,15 +17,10 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import division
-
 import logging
 import math
 import random
 import unittest
-from builtins import object
-from builtins import range
 
 from apache_beam import coders
 from apache_beam.runners.worker import opcounters
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py 
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index f1aa830..75fccd8 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -23,8 +23,6 @@ source, write to a sink, parallel do, etc.
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import collections
 
 from apache_beam import coders
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index c88cdb5..19546dd 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -22,14 +22,9 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import collections
 import logging
 import threading
-from builtins import filter
-from builtins import object
-from builtins import zip
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import DefaultDict
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index a140fe2..2106d11 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -20,10 +20,6 @@
 # pytype: skip-file
 # mypy: disallow-untyped-defs
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
 import abc
 import collections
 import contextlib
@@ -34,8 +30,8 @@ import sys
 import threading
 import time
 import traceback
-from builtins import object
 from concurrent import futures
+from types import TracebackType
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import Callable
@@ -54,8 +50,6 @@ from typing import TypeVar
 from typing import Union
 
 import grpc
-from future.utils import raise_
-from future.utils import with_metaclass
 
 from apache_beam.coders import coder_impl
 from apache_beam.metrics import monitoring_infos
@@ -76,14 +70,12 @@ from apache_beam.utils import thread_pool_executor
 from apache_beam.utils.sentinel import Sentinel
 
 if TYPE_CHECKING:
-  # TODO(BEAM-9372): move this out of the TYPE_CHECKING scope when we drop
-  #  support for python < 3.5.3
-  from types import TracebackType
-  ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
-  OptExcInfo = Union[ExcInfo, Tuple[None, None, None]]
   from apache_beam.portability.api import endpoints_pb2
   from apache_beam.utils.profiler import Profile
 
+ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
+OptExcInfo = Union[ExcInfo, Tuple[None, None, None]]
+
 T = TypeVar('T')
 _KT = TypeVar('_KT')
 _VT = TypeVar('_VT')
@@ -802,7 +794,7 @@ class SdkWorker(object):
       yield
 
 
-class StateHandler(with_metaclass(abc.ABCMeta, object)):  # type: ignore[misc]
+class StateHandler(metaclass=abc.ABCMeta):
   """An abstract object representing a ``StateHandler``."""
   @abc.abstractmethod
   def get_raw(
@@ -827,8 +819,19 @@ class StateHandler(with_metaclass(abc.ABCMeta, object)):  
# type: ignore[misc]
     # type: (beam_fn_api_pb2.StateKey) -> _Future
     raise NotImplementedError(type(self))
 
+  @abc.abstractmethod
+  @contextlib.contextmanager
+  def process_instruction_id(self, bundle_id):
+    # type: (str) -> Iterator[None]
+    raise NotImplementedError(type(self))
+
+  @abc.abstractmethod
+  def done(self):
+    # type: () -> None
+    raise NotImplementedError(type(self))
+
 
-class StateHandlerFactory(with_metaclass(abc.ABCMeta, object)):  # type: 
ignore[misc]
+class StateHandlerFactory(metaclass=abc.ABCMeta):
   """An abstract factory for creating ``DataChannel``."""
   @abc.abstractmethod
   def create_state_handler(self, api_service_descriptor):
@@ -883,7 +886,7 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
           # Add workerId to the grpc channel
           grpc_channel = grpc.intercept_channel(
               grpc_channel, WorkerIdInterceptor())
-          self._state_handler_cache[url] = CachingStateHandler(
+          self._state_handler_cache[url] = GlobalCachingStateHandler(
               self._state_cache,
               GrpcStateHandler(
                   beam_fn_api_pb2_grpc.BeamFnStateStub(grpc_channel)))
@@ -898,22 +901,67 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
     self._state_cache.evict_all()
 
 
-class ThrowingStateHandler(StateHandler):
-  """A state handler that errors on any requests."""
-  def get_raw(
+class CachingStateHandler(metaclass=abc.ABCMeta):
+  @abc.abstractmethod
+  @contextlib.contextmanager
+  def process_instruction_id(self, bundle_id, cache_tokens):
+    # type: (str, Iterable[beam_fn_api_pb2.ProcessBundleRequest.CacheToken]) 
-> Iterator[None]
+    raise NotImplementedError(type(self))
+
+  @abc.abstractmethod
+  def blocking_get(
       self,
       state_key,  # type: beam_fn_api_pb2.StateKey
-      continuation_token=None  # type: Optional[bytes]
+      coder,  # type: coder_impl.CoderImpl
   ):
-    # type: (...) -> Tuple[bytes, Optional[bytes]]
+    # type: (...) -> Iterable[Any]
+    raise NotImplementedError(type(self))
+
+  @abc.abstractmethod
+  def extend(
+      self,
+      state_key,  # type: beam_fn_api_pb2.StateKey
+      coder,  # type: coder_impl.CoderImpl
+      elements,  # type: Iterable[Any]
+  ):
+    # type: (...) -> _Future
+    raise NotImplementedError(type(self))
+
+  @abc.abstractmethod
+  def clear(self, state_key):
+    # type: (beam_fn_api_pb2.StateKey) -> _Future
+    raise NotImplementedError(type(self))
+
+  @abc.abstractmethod
+  def done(self):
+    # type: () -> None
+    raise NotImplementedError(type(self))
+
+
+class ThrowingStateHandler(CachingStateHandler):
+  """A caching state handler that errors on any requests."""
+  @contextlib.contextmanager
+  def process_instruction_id(self, bundle_id, cache_tokens):
+    # type: (str, Iterable[beam_fn_api_pb2.ProcessBundleRequest.CacheToken]) 
-> Iterator[None]
+    raise RuntimeError(
+        'Unable to handle state requests for ProcessBundleDescriptor '
+        'for bundle id %s.' % bundle_id)
+
+  def blocking_get(
+      self,
+      state_key,  # type: beam_fn_api_pb2.StateKey
+      coder,  # type: coder_impl.CoderImpl
+  ):
+    # type: (...) -> Iterable[Any]
     raise RuntimeError(
         'Unable to handle state requests for ProcessBundleDescriptor without '
         'state ApiServiceDescriptor for state key %s.' % state_key)
 
-  def append_raw(
+  def extend(
       self,
       state_key,  # type: beam_fn_api_pb2.StateKey
-      data  # type: bytes
+      coder,  # type: coder_impl.CoderImpl
+      elements,  # type: Iterable[Any]
   ):
     # type: (...) -> _Future
     raise RuntimeError(
@@ -926,6 +974,11 @@ class ThrowingStateHandler(StateHandler):
         'Unable to handle state requests for ProcessBundleDescriptor without '
         'state ApiServiceDescriptor for state key %s.' % state_key)
 
+  def done(self):
+    # type: () -> None
+    raise RuntimeError(
+        'Unable to handle state requests for ProcessBundleDescriptor.')
+
 
 class GrpcStateHandler(StateHandler):
 
@@ -1038,7 +1091,8 @@ class GrpcStateHandler(StateHandler):
     while not req_future.wait(timeout=1):
       if self._exc_info:
         t, v, tb = self._exc_info
-        raise_(t, v, tb)
+        if t and v and tb:
+          raise t(v).with_traceback(tb)
       elif self._done:
         raise RuntimeError()
     response = req_future.get()
@@ -1059,7 +1113,7 @@ class GrpcStateHandler(StateHandler):
     return str(request_id)
 
 
-class CachingStateHandler(object):
+class GlobalCachingStateHandler(CachingStateHandler):
   """ A State handler which retrieves and caches state.
    If caching is activated, caches across bundles using a supplied cache token.
    If activated but no cache token is supplied, caching is done at the bundle
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index e1f6635..8cd6077 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -19,8 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import http.server
 import json
 import logging
@@ -29,7 +27,6 @@ import re
 import sys
 import threading
 import traceback
-from builtins import object
 
 from google.protobuf import text_format  # type: ignore # not in typeshed
 
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
index b863703..fdd6638 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
@@ -19,10 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
 import logging
 import unittest
 
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 5972415..642eed2 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -19,16 +19,11 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
 import contextlib
 import logging
 import threading
 import time
 import unittest
-from builtins import range
 from collections import namedtuple
 
 import grpc
@@ -47,7 +42,7 @@ from apache_beam.runners.worker import sdk_worker
 from apache_beam.runners.worker import statecache
 from apache_beam.runners.worker import statesampler
 from apache_beam.runners.worker.sdk_worker import BundleProcessorCache
-from apache_beam.runners.worker.sdk_worker import CachingStateHandler
+from apache_beam.runners.worker.sdk_worker import GlobalCachingStateHandler
 from apache_beam.runners.worker.sdk_worker import SdkWorker
 from apache_beam.utils import thread_pool_executor
 from apache_beam.utils.counters import CounterName
@@ -362,7 +357,7 @@ class CachingStateHandlerTest(unittest.TestCase):
 
     underlying_state = FakeUnderlyingState()
     state_cache = statecache.StateCache(100)
-    caching_state_hander = sdk_worker.CachingStateHandler(
+    caching_state_hander = GlobalCachingStateHandler(
         state_cache, underlying_state)
 
     state1 = beam_fn_api_pb2.StateKey(
@@ -498,8 +493,7 @@ class CachingStateHandlerTest(unittest.TestCase):
 
     underlying_state_handler = self.UnderlyingStateHandler()
     state_cache = statecache.StateCache(100)
-    handler = sdk_worker.CachingStateHandler(
-        state_cache, underlying_state_handler)
+    handler = GlobalCachingStateHandler(state_cache, underlying_state_handler)
 
     def get():
       return handler.blocking_get(state, coder.get_impl())
@@ -529,8 +523,7 @@ class CachingStateHandlerTest(unittest.TestCase):
   def test_continuation_token(self):
     underlying_state_handler = self.UnderlyingStateHandler()
     state_cache = statecache.StateCache(100)
-    handler = sdk_worker.CachingStateHandler(
-        state_cache, underlying_state_handler)
+    handler = GlobalCachingStateHandler(state_cache, underlying_state_handler)
 
     coder = VarIntCoder()
 
@@ -558,10 +551,12 @@ class CachingStateHandlerTest(unittest.TestCase):
     underlying_state_handler.set_continuations(True)
     underlying_state_handler.set_values([45, 46, 47], coder)
     with handler.process_instruction_id('bundle', [cache_token]):
-      self.assertEqual(get_type(), CachingStateHandler.ContinuationIterable)
+      self.assertEqual(
+          get_type(), GlobalCachingStateHandler.ContinuationIterable)
       self.assertEqual(get(), [45, 46, 47])
       append(48, 49)
-      self.assertEqual(get_type(), CachingStateHandler.ContinuationIterable)
+      self.assertEqual(
+          get_type(), GlobalCachingStateHandler.ContinuationIterable)
       self.assertEqual(get(), [45, 46, 47, 48, 49])
       clear()
       self.assertEqual(get_type(), list)
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py 
b/sdks/python/apache_beam/runners/worker/sideinputs.py
index 02ff4fd..7192ec1 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -19,15 +19,11 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import collections
 import logging
 import queue
 import threading
 import traceback
-from builtins import object
-from builtins import range
 
 from apache_beam.coders import observable
 from apache_beam.io import iobase
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py 
b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index 35e5d36..f609cf4 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -19,13 +19,9 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import logging
 import time
 import unittest
-from builtins import object
-from builtins import range
 
 import mock
 
diff --git a/sdks/python/apache_beam/runners/worker/statecache.py 
b/sdks/python/apache_beam/runners/worker/statecache.py
index 0fd8ac7..3ed0c15 100644
--- a/sdks/python/apache_beam/runners/worker/statecache.py
+++ b/sdks/python/apache_beam/runners/worker/statecache.py
@@ -19,8 +19,6 @@
 # pytype: skip-file
 # mypy: disallow-untyped-defs
 
-from __future__ import absolute_import
-
 import collections
 import logging
 import threading
diff --git a/sdks/python/apache_beam/runners/worker/statecache_test.py 
b/sdks/python/apache_beam/runners/worker/statecache_test.py
index d18dd6c..a1a175e 100644
--- a/sdks/python/apache_beam/runners/worker/statecache_test.py
+++ b/sdks/python/apache_beam/runners/worker/statecache_test.py
@@ -18,8 +18,6 @@
 """Tests for state caching."""
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import logging
 import unittest
 
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py 
b/sdks/python/apache_beam/runners/worker/statesampler.py
index abb43a7..7230b24 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -19,8 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import contextlib
 import threading
 from typing import TYPE_CHECKING
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py 
b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
index a27e00c..2451ab2 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_slow.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
@@ -19,9 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
-from builtins import object
 from typing import Optional
 
 from apache_beam.runners import common
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py 
b/sdks/python/apache_beam/runners/worker/statesampler_test.py
index b34c9b5..c9ea7e8 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_test.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py
@@ -18,13 +18,9 @@
 """Tests for state sampler."""
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import division
-
 import logging
 import time
 import unittest
-from builtins import range
 
 from tenacity import retry
 from tenacity import stop_after_attempt
diff --git a/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py 
b/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
index 4bcdb03..b913f2c 100644
--- a/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
+++ b/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
@@ -18,10 +18,6 @@
 """Client Interceptor to inject worker_id"""
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
 import collections
 import os
 from typing import Optional
diff --git 
a/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py 
b/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py
index 258eac0..0db9c1b 100644
--- a/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py
+++ b/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py
@@ -18,8 +18,6 @@
 """Test for WorkerIdInterceptor"""
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import collections
 import logging
 import unittest
diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py 
b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
index f4a4728..e5bfff8 100644
--- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py
+++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
@@ -29,8 +29,6 @@ This entry point is used by the Python SDK container in 
worker pool mode.
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import argparse
 import atexit
 import logging
diff --git a/sdks/python/apache_beam/runners/worker/worker_status.py 
b/sdks/python/apache_beam/runners/worker/worker_status.py
index 0fe8483..b699f07 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status.py
@@ -17,9 +17,6 @@
 
 """Worker status api handler for reporting SDK harness debug info."""
 
-from __future__ import absolute_import
-from __future__ import division
-
 import queue
 import sys
 import threading
diff --git a/sdks/python/apache_beam/runners/worker/worker_status_test.py 
b/sdks/python/apache_beam/runners/worker/worker_status_test.py
index 686b2d6..2b4c7dc 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status_test.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status_test.py
@@ -15,8 +15,6 @@
 # limitations under the License.
 #
 
-from __future__ import absolute_import
-
 import logging
 import threading
 import unittest

Reply via email to