This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1eaa869 [BEAM-7372][BEAM-9372] Cleanup py2 and py35 codepath from
runners worker (#14730)
1eaa869 is described below
commit 1eaa8694528f44c8254992ad30da98aa8fb999cf
Author: yoshiki.obata <[email protected]>
AuthorDate: Wed May 19 06:28:09 2021 +0900
[BEAM-7372][BEAM-9372] Cleanup py2 and py35 codepath from runners worker
(#14730)
---
sdks/python/apache_beam/runners/__init__.py | 2 -
sdks/python/apache_beam/runners/common.py | 19 +---
sdks/python/apache_beam/runners/common_test.py | 2 -
.../python/apache_beam/runners/pipeline_context.py | 3 -
.../apache_beam/runners/pipeline_context_test.py | 2 -
.../portability/fn_api_runner/worker_handlers.py | 10 +-
sdks/python/apache_beam/runners/runner.py | 3 -
sdks/python/apache_beam/runners/runner_test.py | 2 -
sdks/python/apache_beam/runners/sdf_utils.py | 4 -
sdks/python/apache_beam/runners/sdf_utils_test.py | 2 -
sdks/python/apache_beam/runners/worker/__init__.py | 1 -
.../apache_beam/runners/worker/bundle_processor.py | 27 ++----
.../runners/worker/bundle_processor_test.py | 2 -
.../apache_beam/runners/worker/channel_factory.py | 4 -
.../apache_beam/runners/worker/data_plane.py | 44 ++++-----
.../apache_beam/runners/worker/data_plane_test.py | 4 -
.../apache_beam/runners/worker/log_handler.py | 3 -
.../apache_beam/runners/worker/log_handler_test.py | 3 -
sdks/python/apache_beam/runners/worker/logger.py | 2 -
.../apache_beam/runners/worker/logger_test.py | 4 -
.../apache_beam/runners/worker/opcounters.py | 11 +--
.../apache_beam/runners/worker/opcounters_test.py | 5 -
.../apache_beam/runners/worker/operation_specs.py | 2 -
.../apache_beam/runners/worker/operations.py | 5 -
.../apache_beam/runners/worker/sdk_worker.py | 102 ++++++++++++++++-----
.../apache_beam/runners/worker/sdk_worker_main.py | 3 -
.../runners/worker/sdk_worker_main_test.py | 4 -
.../apache_beam/runners/worker/sdk_worker_test.py | 21 ++---
.../apache_beam/runners/worker/sideinputs.py | 4 -
.../apache_beam/runners/worker/sideinputs_test.py | 4 -
.../apache_beam/runners/worker/statecache.py | 2 -
.../apache_beam/runners/worker/statecache_test.py | 2 -
.../apache_beam/runners/worker/statesampler.py | 2 -
.../runners/worker/statesampler_slow.py | 3 -
.../runners/worker/statesampler_test.py | 4 -
.../runners/worker/worker_id_interceptor.py | 4 -
.../runners/worker/worker_id_interceptor_test.py | 2 -
.../apache_beam/runners/worker/worker_pool_main.py | 2 -
.../apache_beam/runners/worker/worker_status.py | 3 -
.../runners/worker/worker_status_test.py | 2 -
40 files changed, 131 insertions(+), 199 deletions(-)
diff --git a/sdks/python/apache_beam/runners/__init__.py
b/sdks/python/apache_beam/runners/__init__.py
index 0f278d1..f92d95a 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -20,8 +20,6 @@
This package defines runners, which are used to execute a pipeline.
"""
-from __future__ import absolute_import
-
from apache_beam.runners.direct.direct_runner import DirectRunner
from apache_beam.runners.direct.test_direct_runner import TestDirectRunner
from apache_beam.runners.runner import PipelineRunner
diff --git a/sdks/python/apache_beam/runners/common.py
b/sdks/python/apache_beam/runners/common.py
index 7703ae7..7b73326 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -23,16 +23,9 @@ For internal use only; no backwards-compatibility guarantees.
"""
# pytype: skip-file
-
-from __future__ import absolute_import
-from __future__ import division
-
+import sys
import threading
import traceback
-from builtins import next
-from builtins import object
-from builtins import round
-from builtins import zip
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
@@ -42,9 +35,6 @@ from typing import Mapping
from typing import Optional
from typing import Tuple
-from future.utils import raise_with_traceback
-from past.builtins import unicode
-
from apache_beam.coders import TupleCoder
from apache_beam.internal import util
from apache_beam.options.value_provider import RuntimeValueProvider
@@ -1312,7 +1302,8 @@ class DoFnRunner:
traceback.format_exception_only(type(exn), exn)[-1].strip() +
step_annotation)
new_exn._tagged_with_step = True
- raise_with_traceback(new_exn)
+ _, _, tb = sys.exc_info()
+ raise new_exn.with_traceback(tb)
class OutputProcessor(object):
@@ -1371,7 +1362,7 @@ class _OutputProcessor(OutputProcessor):
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
- if not isinstance(tag, (str, unicode)):
+ if not isinstance(tag, str):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value
if isinstance(result, WindowedValue):
@@ -1421,7 +1412,7 @@ class _OutputProcessor(OutputProcessor):
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
- if not isinstance(tag, (str, unicode)):
+ if not isinstance(tag, str):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value
diff --git a/sdks/python/apache_beam/runners/common_test.py
b/sdks/python/apache_beam/runners/common_test.py
index c9860e6..c089d5a 100644
--- a/sdks/python/apache_beam/runners/common_test.py
+++ b/sdks/python/apache_beam/runners/common_test.py
@@ -17,8 +17,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import unittest
import hamcrest as hc
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py
b/sdks/python/apache_beam/runners/pipeline_context.py
index be62545..7cefe39 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -23,9 +23,6 @@ For internal use only; no backwards-compatibility guarantees.
# pytype: skip-file
# mypy: disallow-untyped-defs
-from __future__ import absolute_import
-
-from builtins import object
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py
b/sdks/python/apache_beam/runners/pipeline_context_test.py
index 5e9334b..49ff6f7 100644
--- a/sdks/python/apache_beam/runners/pipeline_context_test.py
+++ b/sdks/python/apache_beam/runners/pipeline_context_test.py
@@ -19,8 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import unittest
from apache_beam import coders
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index c63e5f0..b967e01 100644
---
a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++
b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -316,7 +316,7 @@ class WorkerHandler(object):
urn, # type: str
payload_type # type: Optional[Type[T]]
):
- # type: (...) -> Callable[[Callable[[T, sdk_worker.StateHandler,
ExtendedProvisionInfo, GrpcServer], WorkerHandler]], Callable[[T,
sdk_worker.StateHandler, ExtendedProvisionInfo, GrpcServer], WorkerHandler]]
+ # type: (...) -> Callable[[Type[WorkerHandler]], Callable[[T,
sdk_worker.StateHandler, ExtendedProvisionInfo, GrpcServer], WorkerHandler]]
def wrapper(constructor):
# type: (Callable) -> Callable
cls._registered_environments[urn] = constructor, payload_type # type:
ignore[assignment]
@@ -343,7 +343,7 @@ class WorkerHandler(object):
# This takes a WorkerHandlerManager instead of GrpcServer, so it is not
# compatible with WorkerHandler.register_environment. There is a special case
# in WorkerHandlerManager.get_worker_handlers() that allows it to work.
[email protected]_environment(python_urns.EMBEDDED_PYTHON, None) #
type: ignore[arg-type]
[email protected]_environment(python_urns.EMBEDDED_PYTHON, None)
class EmbeddedWorkerHandler(WorkerHandler):
"""An in-memory worker_handler for fn API control, state and data planes."""
@@ -361,7 +361,7 @@ class EmbeddedWorkerHandler(WorkerHandler):
state_cache = StateCache(STATE_CACHE_SIZE)
self.bundle_processor_cache = sdk_worker.BundleProcessorCache(
SingletonStateHandlerFactory(
- sdk_worker.CachingStateHandler(state_cache, state)),
+ sdk_worker.GlobalCachingStateHandler(state_cache, state)),
data_plane.InMemoryDataChannelFactory(
self.data_plane_handler.inverse()),
worker_manager._process_bundle_descriptors)
@@ -1053,6 +1053,10 @@ class
StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer,
pass
return _Future.done()
+ def done(self):
+ # type: () -> None
+ pass
+
@staticmethod
def _to_key(state_key):
# type: (beam_fn_api_pb2.StateKey) -> bytes
diff --git a/sdks/python/apache_beam/runners/runner.py
b/sdks/python/apache_beam/runners/runner.py
index 02ed845..1030a53 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -19,15 +19,12 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import importlib
import logging
import os
import shelve
import shutil
import tempfile
-from builtins import object
from typing import TYPE_CHECKING
from typing import Optional
diff --git a/sdks/python/apache_beam/runners/runner_test.py
b/sdks/python/apache_beam/runners/runner_test.py
index 0923369..61fe400 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -24,8 +24,6 @@ caching and clearing values that are not tested elsewhere.
# pytype: skip-file
-from __future__ import absolute_import
-
import unittest
import apache_beam as beam
diff --git a/sdks/python/apache_beam/runners/sdf_utils.py
b/sdks/python/apache_beam/runners/sdf_utils.py
index aa91bda..8ff9a35 100644
--- a/sdks/python/apache_beam/runners/sdf_utils.py
+++ b/sdks/python/apache_beam/runners/sdf_utils.py
@@ -19,12 +19,8 @@
"""Common utility class to help SDK harness to execute an SDF. """
-from __future__ import absolute_import
-from __future__ import division
-
import logging
import threading
-from builtins import object
from typing import TYPE_CHECKING
from typing import Any
from typing import NamedTuple
diff --git a/sdks/python/apache_beam/runners/sdf_utils_test.py
b/sdks/python/apache_beam/runners/sdf_utils_test.py
index 0870680..a4510d7 100644
--- a/sdks/python/apache_beam/runners/sdf_utils_test.py
+++ b/sdks/python/apache_beam/runners/sdf_utils_test.py
@@ -19,8 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import time
import unittest
diff --git a/sdks/python/apache_beam/runners/worker/__init__.py
b/sdks/python/apache_beam/runners/worker/__init__.py
index 9fbf215..0bce5d6 100644
--- a/sdks/python/apache_beam/runners/worker/__init__.py
+++ b/sdks/python/apache_beam/runners/worker/__init__.py
@@ -16,4 +16,3 @@
#
"""For internal use only; no backwards-compatibility guarantees."""
-from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index f05228e..d4c066b 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -19,10 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
import base64
import bisect
import collections
@@ -31,8 +27,6 @@ import json
import logging
import random
import threading
-from builtins import next
-from builtins import object
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
@@ -52,7 +46,6 @@ from typing import TypeVar
from typing import Union
from typing import cast
-from future.utils import itervalues
from google.protobuf import duration_pb2
from google.protobuf import timestamp_pb2
@@ -174,7 +167,7 @@ class DataInputOperation(RunnerIOOperation):
def __init__(self,
operation_name, # type: Union[str, common.NameContext]
step_name,
- consumers, # type: Mapping[Any, Iterable[operations.Operation]]
+ consumers, # type: Mapping[Any, List[operations.Operation]]
counter_factory, # type: counters.CounterFactory
state_sampler, # type: statesampler.StateSampler
windowed_coder, # type: coders.Coder
@@ -197,7 +190,7 @@ class DataInputOperation(RunnerIOOperation):
self.counter_factory,
self.name_context.step_name,
0,
- next(iter(itervalues(consumers))),
+ next(iter(consumers.values())),
self.windowed_coder,
self._get_runtime_performance_hints())
]
@@ -963,7 +956,7 @@ class BundleProcessor(object):
# (transform_id, timer_family_id).
data_channels = collections.defaultdict(
list
- ) # type: DefaultDict[data_plane.GrpcClientDataChannel, List[Union[str,
Tuple[str, str]]]]
+ ) # type: DefaultDict[data_plane.DataChannel, List[Union[str,
Tuple[str, str]]]]
# Add expected data inputs for each data channel.
input_op_by_transform_id = {}
@@ -976,13 +969,13 @@ class BundleProcessor(object):
data_channels[self.timer_data_channel].extend(
list(self.timers_info.keys()))
- # Set up timer output stream for DoOperation.
- for ((transform_id, timer_family_id),
- timer_info) in self.timers_info.items():
- output_stream = self.timer_data_channel.output_timer_stream(
- instruction_id, transform_id, timer_family_id)
- timer_info.output_stream = output_stream
- self.ops[transform_id].add_timer_info(timer_family_id, timer_info)
+ # Set up timer output stream for DoOperation.
+ for ((transform_id, timer_family_id),
+ timer_info) in self.timers_info.items():
+ output_stream = self.timer_data_channel.output_timer_stream(
+ instruction_id, transform_id, timer_family_id)
+ timer_info.output_stream = output_stream
+ self.ops[transform_id].add_timer_info(timer_family_id, timer_info)
# Process data and timer inputs
for data_channel, expected_inputs in data_channels.items():
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
index 4a3ce44..87802bc 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
@@ -18,8 +18,6 @@
"""Unit tests for bundle processing."""
# pytype: skip-file
-from __future__ import absolute_import
-
import unittest
from apache_beam.runners.worker.bundle_processor import DataInputOperation
diff --git a/sdks/python/apache_beam/runners/worker/channel_factory.py
b/sdks/python/apache_beam/runners/worker/channel_factory.py
index b3f31b1..cd859e6 100644
--- a/sdks/python/apache_beam/runners/worker/channel_factory.py
+++ b/sdks/python/apache_beam/runners/worker/channel_factory.py
@@ -18,10 +18,6 @@
"""Factory to create grpc channel."""
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
import grpc
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py
b/sdks/python/apache_beam/runners/worker/data_plane.py
index c106758..ffe48d1 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -20,10 +20,6 @@
# pytype: skip-file
# mypy: disallow-untyped-defs
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
import abc
import collections
import logging
@@ -31,11 +27,11 @@ import queue
import sys
import threading
import time
-from builtins import object
-from builtins import range
+from types import TracebackType
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
+from typing import Collection
from typing import DefaultDict
from typing import Dict
from typing import Iterable
@@ -49,8 +45,6 @@ from typing import Type
from typing import Union
import grpc
-from future.utils import raise_
-from future.utils import with_metaclass
from apache_beam.coders import coder_impl
from apache_beam.portability.api import beam_fn_api_pb2
@@ -59,21 +53,16 @@ from apache_beam.runners.worker.channel_factory import
GRPCChannelFactory
from apache_beam.runners.worker.worker_id_interceptor import
WorkerIdInterceptor
if TYPE_CHECKING:
- # TODO(BEAM-9372): move this out of the TYPE_CHECKING scope when we drop
- # support for python < 3.5.3
- from types import TracebackType
- ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
- OptExcInfo = Union[ExcInfo, Tuple[None, None, None]]
- # TODO: move this out of the TYPE_CHECKING scope when we drop support for
- # python < 3.6
- from typing import Collection # pylint: disable=ungrouped-imports
import apache_beam.coders.slow_stream
OutputStream = apache_beam.coders.slow_stream.OutputStream
- DataOrTimers = \
- Union[beam_fn_api_pb2.Elements.Data, beam_fn_api_pb2.Elements.Timers]
+ DataOrTimers = Union[beam_fn_api_pb2.Elements.Data,
+ beam_fn_api_pb2.Elements.Timers]
else:
OutputStream = type(coder_impl.create_OutputStream())
+ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
+OptExcInfo = Union[ExcInfo, Tuple[None, None, None]]
+
# This module is experimental. No backwards-compatibility guarantees.
_LOGGER = logging.getLogger(__name__)
@@ -226,7 +215,7 @@ class PeriodicThread(threading.Thread):
self._finished.set()
-class DataChannel(with_metaclass(abc.ABCMeta, object)): # type: ignore[misc]
+class DataChannel(metaclass=abc.ABCMeta):
"""Represents a channel for reading and writing data over the data plane.
Read data and timer from this channel with the input_elements method::
@@ -424,7 +413,7 @@ class _GrpcDataChannel(DataChannel):
self._receive_lock = threading.Lock()
self._reads_finished = threading.Event()
self._closed = False
- self._exc_info = None # type: Optional[OptExcInfo]
+ self._exc_info = (None, None, None) # type: OptExcInfo
def close(self):
# type: () -> None
@@ -473,9 +462,9 @@ class _GrpcDataChannel(DataChannel):
raise RuntimeError('Channel closed prematurely.')
if abort_callback():
return
- if self._exc_info:
- t, v, tb = self._exc_info
- raise_(t, v, tb)
+ t, v, tb = self._exc_info
+ if t:
+ raise t(v).with_traceback(tb)
else:
if isinstance(element, beam_fn_api_pb2.Elements.Timers):
if element.is_last:
@@ -641,7 +630,7 @@ class
BeamFnDataServicer(beam_fn_api_pb2_grpc.BeamFnDataServicer):
yield elements
-class DataChannelFactory(with_metaclass(abc.ABCMeta, object)): # type:
ignore[misc]
+class DataChannelFactory(metaclass=abc.ABCMeta):
"""An abstract factory for creating ``DataChannel``."""
@abc.abstractmethod
def create_data_channel(self, remote_grpc_port):
@@ -651,6 +640,13 @@ class DataChannelFactory(with_metaclass(abc.ABCMeta,
object)): # type: ignore[m
raise NotImplementedError(type(self))
@abc.abstractmethod
+ def create_data_channel_from_url(self, url):
+ # type: (str) -> Optional[GrpcClientDataChannel]
+
+ """Returns a ``DataChannel`` from the given url."""
+ raise NotImplementedError(type(self))
+
+ @abc.abstractmethod
def close(self):
# type: () -> None
diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py
b/sdks/python/apache_beam/runners/worker/data_plane_test.py
index 1a83fe1..5124bb6 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane_test.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py
@@ -19,10 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
import itertools
import logging
import time
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py
b/sdks/python/apache_beam/runners/worker/log_handler.py
index ae01ff3..46157db 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -20,9 +20,6 @@
# pytype: skip-file
# mypy: disallow-untyped-defs
-from __future__ import absolute_import
-from __future__ import print_function
-
import logging
import math
import queue
diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py
b/sdks/python/apache_beam/runners/worker/log_handler_test.py
index dcae3f6..a045c0c 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler_test.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py
@@ -17,12 +17,9 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import logging
import re
import unittest
-from builtins import range
import grpc
diff --git a/sdks/python/apache_beam/runners/worker/logger.py
b/sdks/python/apache_beam/runners/worker/logger.py
index f650123..e171caf 100644
--- a/sdks/python/apache_beam/runners/worker/logger.py
+++ b/sdks/python/apache_beam/runners/worker/logger.py
@@ -22,8 +22,6 @@
# pytype: skip-file
# mypy: disallow-untyped-defs
-from __future__ import absolute_import
-
import contextlib
import json
import logging
diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py
b/sdks/python/apache_beam/runners/worker/logger_test.py
index 47a5ab5..158fd3b 100644
--- a/sdks/python/apache_beam/runners/worker/logger_test.py
+++ b/sdks/python/apache_beam/runners/worker/logger_test.py
@@ -19,15 +19,11 @@
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import unicode_literals
-
import json
import logging
import sys
import threading
import unittest
-from builtins import object
from apache_beam.runners.worker import logger
from apache_beam.runners.worker import statesampler
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py
b/sdks/python/apache_beam/runners/worker/opcounters.py
index d761be4..bafbf9f 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -22,19 +22,13 @@
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import division
-
import math
import random
-from builtins import hex
-from builtins import object
+import sys
from typing import TYPE_CHECKING
from typing import Any
from typing import Optional
-from future.utils import raise_with_traceback
-
from apache_beam.typehints import TypeCheckError
from apache_beam.typehints.decorators import _check_instance_type
from apache_beam.utils import counters
@@ -245,7 +239,8 @@ class OperationCounters(object):
error_msg = (
'Runtime type violation detected within %s: '
'%s' % (transform_label, e))
- raise_with_traceback(TypeCheckError(error_msg))
+ _, _, traceback = sys.exc_info()
+ raise TypeCheckError(error_msg).with_traceback(traceback)
def do_sample(self, windowed_value):
# type: (windowed_value.WindowedValue) -> None
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py
b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index d3ab22d..75842fc 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -17,15 +17,10 @@
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import division
-
import logging
import math
import random
import unittest
-from builtins import object
-from builtins import range
from apache_beam import coders
from apache_beam.runners.worker import opcounters
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index f1aa830..75fccd8 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -23,8 +23,6 @@ source, write to a sink, parallel do, etc.
# pytype: skip-file
-from __future__ import absolute_import
-
import collections
from apache_beam import coders
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index c88cdb5..19546dd 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -22,14 +22,9 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import collections
import logging
import threading
-from builtins import filter
-from builtins import object
-from builtins import zip
from typing import TYPE_CHECKING
from typing import Any
from typing import DefaultDict
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index a140fe2..2106d11 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -20,10 +20,6 @@
# pytype: skip-file
# mypy: disallow-untyped-defs
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
import abc
import collections
import contextlib
@@ -34,8 +30,8 @@ import sys
import threading
import time
import traceback
-from builtins import object
from concurrent import futures
+from types import TracebackType
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
@@ -54,8 +50,6 @@ from typing import TypeVar
from typing import Union
import grpc
-from future.utils import raise_
-from future.utils import with_metaclass
from apache_beam.coders import coder_impl
from apache_beam.metrics import monitoring_infos
@@ -76,14 +70,12 @@ from apache_beam.utils import thread_pool_executor
from apache_beam.utils.sentinel import Sentinel
if TYPE_CHECKING:
- # TODO(BEAM-9372): move this out of the TYPE_CHECKING scope when we drop
- # support for python < 3.5.3
- from types import TracebackType
- ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
- OptExcInfo = Union[ExcInfo, Tuple[None, None, None]]
from apache_beam.portability.api import endpoints_pb2
from apache_beam.utils.profiler import Profile
+ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
+OptExcInfo = Union[ExcInfo, Tuple[None, None, None]]
+
T = TypeVar('T')
_KT = TypeVar('_KT')
_VT = TypeVar('_VT')
@@ -802,7 +794,7 @@ class SdkWorker(object):
yield
-class StateHandler(with_metaclass(abc.ABCMeta, object)): # type: ignore[misc]
+class StateHandler(metaclass=abc.ABCMeta):
"""An abstract object representing a ``StateHandler``."""
@abc.abstractmethod
def get_raw(
@@ -827,8 +819,19 @@ class StateHandler(with_metaclass(abc.ABCMeta, object)):
# type: ignore[misc]
# type: (beam_fn_api_pb2.StateKey) -> _Future
raise NotImplementedError(type(self))
+ @abc.abstractmethod
+ @contextlib.contextmanager
+ def process_instruction_id(self, bundle_id):
+ # type: (str) -> Iterator[None]
+ raise NotImplementedError(type(self))
+
+ @abc.abstractmethod
+ def done(self):
+ # type: () -> None
+ raise NotImplementedError(type(self))
+
-class StateHandlerFactory(with_metaclass(abc.ABCMeta, object)): # type:
ignore[misc]
+class StateHandlerFactory(metaclass=abc.ABCMeta):
"""An abstract factory for creating ``DataChannel``."""
@abc.abstractmethod
def create_state_handler(self, api_service_descriptor):
@@ -883,7 +886,7 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
# Add workerId to the grpc channel
grpc_channel = grpc.intercept_channel(
grpc_channel, WorkerIdInterceptor())
- self._state_handler_cache[url] = CachingStateHandler(
+ self._state_handler_cache[url] = GlobalCachingStateHandler(
self._state_cache,
GrpcStateHandler(
beam_fn_api_pb2_grpc.BeamFnStateStub(grpc_channel)))
@@ -898,22 +901,67 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
self._state_cache.evict_all()
-class ThrowingStateHandler(StateHandler):
- """A state handler that errors on any requests."""
- def get_raw(
+class CachingStateHandler(metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ @contextlib.contextmanager
+ def process_instruction_id(self, bundle_id, cache_tokens):
+ # type: (str, Iterable[beam_fn_api_pb2.ProcessBundleRequest.CacheToken])
-> Iterator[None]
+ raise NotImplementedError(type(self))
+
+ @abc.abstractmethod
+ def blocking_get(
self,
state_key, # type: beam_fn_api_pb2.StateKey
- continuation_token=None # type: Optional[bytes]
+ coder, # type: coder_impl.CoderImpl
):
- # type: (...) -> Tuple[bytes, Optional[bytes]]
+ # type: (...) -> Iterable[Any]
+ raise NotImplementedError(type(self))
+
+ @abc.abstractmethod
+ def extend(
+ self,
+ state_key, # type: beam_fn_api_pb2.StateKey
+ coder, # type: coder_impl.CoderImpl
+ elements, # type: Iterable[Any]
+ ):
+ # type: (...) -> _Future
+ raise NotImplementedError(type(self))
+
+ @abc.abstractmethod
+ def clear(self, state_key):
+ # type: (beam_fn_api_pb2.StateKey) -> _Future
+ raise NotImplementedError(type(self))
+
+ @abc.abstractmethod
+ def done(self):
+ # type: () -> None
+ raise NotImplementedError(type(self))
+
+
+class ThrowingStateHandler(CachingStateHandler):
+ """A caching state handler that errors on any requests."""
+ @contextlib.contextmanager
+ def process_instruction_id(self, bundle_id, cache_tokens):
+ # type: (str, Iterable[beam_fn_api_pb2.ProcessBundleRequest.CacheToken])
-> Iterator[None]
+ raise RuntimeError(
+ 'Unable to handle state requests for ProcessBundleDescriptor '
+ 'for bundle id %s.' % bundle_id)
+
+ def blocking_get(
+ self,
+ state_key, # type: beam_fn_api_pb2.StateKey
+ coder, # type: coder_impl.CoderImpl
+ ):
+ # type: (...) -> Iterable[Any]
raise RuntimeError(
'Unable to handle state requests for ProcessBundleDescriptor without '
'state ApiServiceDescriptor for state key %s.' % state_key)
- def append_raw(
+ def extend(
self,
state_key, # type: beam_fn_api_pb2.StateKey
- data # type: bytes
+ coder, # type: coder_impl.CoderImpl
+ elements, # type: Iterable[Any]
):
# type: (...) -> _Future
raise RuntimeError(
@@ -926,6 +974,11 @@ class ThrowingStateHandler(StateHandler):
'Unable to handle state requests for ProcessBundleDescriptor without '
'state ApiServiceDescriptor for state key %s.' % state_key)
+ def done(self):
+ # type: () -> None
+ raise RuntimeError(
+ 'Unable to handle state requests for ProcessBundleDescriptor.')
+
class GrpcStateHandler(StateHandler):
@@ -1038,7 +1091,8 @@ class GrpcStateHandler(StateHandler):
while not req_future.wait(timeout=1):
if self._exc_info:
t, v, tb = self._exc_info
- raise_(t, v, tb)
+ if t and v and tb:
+ raise t(v).with_traceback(tb)
elif self._done:
raise RuntimeError()
response = req_future.get()
@@ -1059,7 +1113,7 @@ class GrpcStateHandler(StateHandler):
return str(request_id)
-class CachingStateHandler(object):
+class GlobalCachingStateHandler(CachingStateHandler):
""" A State handler which retrieves and caches state.
If caching is activated, caches across bundles using a supplied cache token.
If activated but no cache token is supplied, caching is done at the bundle
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index e1f6635..8cd6077 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -19,8 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import http.server
import json
import logging
@@ -29,7 +27,6 @@ import re
import sys
import threading
import traceback
-from builtins import object
from google.protobuf import text_format # type: ignore # not in typeshed
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
index b863703..fdd6638 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
@@ -19,10 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
import logging
import unittest
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 5972415..642eed2 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -19,16 +19,11 @@
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
import contextlib
import logging
import threading
import time
import unittest
-from builtins import range
from collections import namedtuple
import grpc
@@ -47,7 +42,7 @@ from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker import statecache
from apache_beam.runners.worker import statesampler
from apache_beam.runners.worker.sdk_worker import BundleProcessorCache
-from apache_beam.runners.worker.sdk_worker import CachingStateHandler
+from apache_beam.runners.worker.sdk_worker import GlobalCachingStateHandler
from apache_beam.runners.worker.sdk_worker import SdkWorker
from apache_beam.utils import thread_pool_executor
from apache_beam.utils.counters import CounterName
@@ -362,7 +357,7 @@ class CachingStateHandlerTest(unittest.TestCase):
underlying_state = FakeUnderlyingState()
state_cache = statecache.StateCache(100)
- caching_state_hander = sdk_worker.CachingStateHandler(
+ caching_state_hander = GlobalCachingStateHandler(
state_cache, underlying_state)
state1 = beam_fn_api_pb2.StateKey(
@@ -498,8 +493,7 @@ class CachingStateHandlerTest(unittest.TestCase):
underlying_state_handler = self.UnderlyingStateHandler()
state_cache = statecache.StateCache(100)
- handler = sdk_worker.CachingStateHandler(
- state_cache, underlying_state_handler)
+ handler = GlobalCachingStateHandler(state_cache, underlying_state_handler)
def get():
return handler.blocking_get(state, coder.get_impl())
@@ -529,8 +523,7 @@ class CachingStateHandlerTest(unittest.TestCase):
def test_continuation_token(self):
underlying_state_handler = self.UnderlyingStateHandler()
state_cache = statecache.StateCache(100)
- handler = sdk_worker.CachingStateHandler(
- state_cache, underlying_state_handler)
+ handler = GlobalCachingStateHandler(state_cache, underlying_state_handler)
coder = VarIntCoder()
@@ -558,10 +551,12 @@ class CachingStateHandlerTest(unittest.TestCase):
underlying_state_handler.set_continuations(True)
underlying_state_handler.set_values([45, 46, 47], coder)
with handler.process_instruction_id('bundle', [cache_token]):
- self.assertEqual(get_type(), CachingStateHandler.ContinuationIterable)
+ self.assertEqual(
+ get_type(), GlobalCachingStateHandler.ContinuationIterable)
self.assertEqual(get(), [45, 46, 47])
append(48, 49)
- self.assertEqual(get_type(), CachingStateHandler.ContinuationIterable)
+ self.assertEqual(
+ get_type(), GlobalCachingStateHandler.ContinuationIterable)
self.assertEqual(get(), [45, 46, 47, 48, 49])
clear()
self.assertEqual(get_type(), list)
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py
b/sdks/python/apache_beam/runners/worker/sideinputs.py
index 02ff4fd..7192ec1 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -19,15 +19,11 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import collections
import logging
import queue
import threading
import traceback
-from builtins import object
-from builtins import range
from apache_beam.coders import observable
from apache_beam.io import iobase
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index 35e5d36..f609cf4 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -19,13 +19,9 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import logging
import time
import unittest
-from builtins import object
-from builtins import range
import mock
diff --git a/sdks/python/apache_beam/runners/worker/statecache.py
b/sdks/python/apache_beam/runners/worker/statecache.py
index 0fd8ac7..3ed0c15 100644
--- a/sdks/python/apache_beam/runners/worker/statecache.py
+++ b/sdks/python/apache_beam/runners/worker/statecache.py
@@ -19,8 +19,6 @@
# pytype: skip-file
# mypy: disallow-untyped-defs
-from __future__ import absolute_import
-
import collections
import logging
import threading
diff --git a/sdks/python/apache_beam/runners/worker/statecache_test.py
b/sdks/python/apache_beam/runners/worker/statecache_test.py
index d18dd6c..a1a175e 100644
--- a/sdks/python/apache_beam/runners/worker/statecache_test.py
+++ b/sdks/python/apache_beam/runners/worker/statecache_test.py
@@ -18,8 +18,6 @@
"""Tests for state caching."""
# pytype: skip-file
-from __future__ import absolute_import
-
import logging
import unittest
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py
b/sdks/python/apache_beam/runners/worker/statesampler.py
index abb43a7..7230b24 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -19,8 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import contextlib
import threading
from typing import TYPE_CHECKING
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py
b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
index a27e00c..2451ab2 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_slow.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
@@ -19,9 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-
-from builtins import object
from typing import Optional
from apache_beam.runners import common
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py
b/sdks/python/apache_beam/runners/worker/statesampler_test.py
index b34c9b5..c9ea7e8 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_test.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py
@@ -18,13 +18,9 @@
"""Tests for state sampler."""
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import division
-
import logging
import time
import unittest
-from builtins import range
from tenacity import retry
from tenacity import stop_after_attempt
diff --git a/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
b/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
index 4bcdb03..b913f2c 100644
--- a/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
+++ b/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
@@ -18,10 +18,6 @@
"""Client Interceptor to inject worker_id"""
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
import collections
import os
from typing import Optional
diff --git
a/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py
b/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py
index 258eac0..0db9c1b 100644
--- a/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py
+++ b/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py
@@ -18,8 +18,6 @@
"""Test for WorkerIdInterceptor"""
# pytype: skip-file
-from __future__ import absolute_import
-
import collections
import logging
import unittest
diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py
b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
index f4a4728..e5bfff8 100644
--- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py
+++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
@@ -29,8 +29,6 @@ This entry point is used by the Python SDK container in
worker pool mode.
# pytype: skip-file
-from __future__ import absolute_import
-
import argparse
import atexit
import logging
diff --git a/sdks/python/apache_beam/runners/worker/worker_status.py
b/sdks/python/apache_beam/runners/worker/worker_status.py
index 0fe8483..b699f07 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status.py
@@ -17,9 +17,6 @@
"""Worker status api handler for reporting SDK harness debug info."""
-from __future__ import absolute_import
-from __future__ import division
-
import queue
import sys
import threading
diff --git a/sdks/python/apache_beam/runners/worker/worker_status_test.py
b/sdks/python/apache_beam/runners/worker/worker_status_test.py
index 686b2d6..2b4c7dc 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status_test.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status_test.py
@@ -15,8 +15,6 @@
# limitations under the License.
#
-from __future__ import absolute_import
-
import logging
import threading
import unittest