This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new da9432b [BEAM-5324] Partially port unpackaged modules to Python 3
(#6424)
da9432b is described below
commit da9432b197207ab39edc6cb720286a81153b6d3b
Author: Robbe Sneyders <[email protected]>
AuthorDate: Thu Sep 20 21:11:05 2018 +0200
[BEAM-5324] Partially port unpackaged modules to Python 3 (#6424)
* Partially port unpackaged
* Remove legacy code
---
sdks/python/apache_beam/pipeline_test.py | 14 ++++++++++----
.../apache_beam/runners/portability/fn_api_runner.py | 14 +++++++-------
sdks/python/apache_beam/runners/runner.py | 2 +-
sdks/python/apache_beam/runners/worker/bundle_processor.py | 4 ++--
sdks/python/apache_beam/runners/worker/operation_specs.py | 2 +-
sdks/python/apache_beam/runners/worker/operations.py | 8 +-------
sdks/python/apache_beam/transforms/ptransform.py | 1 +
sdks/python/tox.ini | 2 +-
8 files changed, 24 insertions(+), 23 deletions(-)
diff --git a/sdks/python/apache_beam/pipeline_test.py
b/sdks/python/apache_beam/pipeline_test.py
index 9d4e79a..6397820 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -22,6 +22,7 @@ from __future__ import absolute_import
import copy
import logging
import platform
+import sys
import unittest
from builtins import object
from builtins import range
@@ -390,6 +391,8 @@ class DoFnTest(unittest.TestCase):
assert_that(pcoll, equal_to([11, 12]))
pipeline.run()
+ @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+ 'fixed on Python 3')
def test_side_input_no_tag(self):
class TestDoFn(DoFn):
def process(self, element, prefix, suffix):
@@ -405,6 +408,8 @@ class DoFnTest(unittest.TestCase):
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()
+ @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+ 'fixed on Python 3')
def test_side_input_tagged(self):
class TestDoFn(DoFn):
def process(self, element, prefix, suffix=DoFn.SideInputParam):
@@ -520,13 +525,14 @@ class PipelineOptionsTest(unittest.TestCase):
options = Breakfast()
self.assertEquals(
set(['from_dictionary', 'get_all_options', 'slices', 'style',
- 'view_as', 'display_data', 'next']),
- set([attr for attr in dir(options) if not attr.startswith('_')]))
+ 'view_as', 'display_data']),
+ set([attr for attr in dir(options) if not attr.startswith('_') and
+ attr != 'next']))
self.assertEquals(
set(['from_dictionary', 'get_all_options', 'style', 'view_as',
- 'display_data', 'next']),
+ 'display_data']),
set([attr for attr in dir(options.view_as(Eggs))
- if not attr.startswith('_')]))
+ if not attr.startswith('_') and attr != 'next']))
class RunnerApiTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 4f6b74b..32d7ade 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -609,7 +609,7 @@ class FnApiRunner(runner.PipelineRunner):
pipeline_components.pcollections[pcoll_id],
pipeline_components)
# This is used later to correlate the read and write.
- param = str("group:%s" % stage.name)
+ param = str("group:%s" % stage.name).encode('utf-8')
if stage.name not in pipeline_components.transforms:
pipeline_components.transforms[stage.name].CopyFrom(transform)
gbk_write = Stage(
@@ -651,7 +651,7 @@ class FnApiRunner(runner.PipelineRunner):
transform = stage.transforms[0]
if transform.spec.urn == common_urns.primitives.FLATTEN.urn:
# This is used later to correlate the read and writes.
- param = str("materialize:%s" % transform.unique_name)
+ param = str("materialize:%s" % transform.unique_name).encode('utf-8')
output_pcoll_id, = list(transform.outputs.values())
output_coder_id = pcollections[output_pcoll_id].coder_id
flatten_writes = []
@@ -799,7 +799,7 @@ class FnApiRunner(runner.PipelineRunner):
# Now try to fuse away all pcollections.
for pcoll, producer in producers_by_pcoll.items():
- pcoll_as_param = str("materialize:%s" % pcoll)
+ pcoll_as_param = str("materialize:%s" % pcoll).encode('utf-8')
write_pcoll = None
for consumer in consumers_by_pcoll[pcoll]:
producer = replacement(producer)
@@ -1001,7 +1001,7 @@ class FnApiRunner(runner.PipelineRunner):
# Store the required side inputs into state.
for (transform_id, tag), (pcoll_id, si) in data_side_input.items():
- actual_pcoll_id = pcoll_id[len("materialize:"):]
+ actual_pcoll_id = pcoll_id[len(b"materialize:"):]
value_coder = context.coders[safe_coders[
pipeline_components.pcollections[actual_pcoll_id].coder_id]]
elements_by_window = _WindowGroupingBuffer(si, value_coder)
@@ -1017,14 +1017,14 @@ class FnApiRunner(runner.PipelineRunner):
controller.state_handler.blocking_append(state_key, elements_data)
def get_buffer(pcoll_id):
- if pcoll_id.startswith('materialize:'):
+ if pcoll_id.startswith(b'materialize:'):
if pcoll_id not in pcoll_buffers:
# Just store the data chunks for replay.
pcoll_buffers[pcoll_id] = list()
- elif pcoll_id.startswith('group:'):
+ elif pcoll_id.startswith(b'group:'):
# This is a grouping write, create a grouping buffer if needed.
if pcoll_id not in pcoll_buffers:
- original_gbk_transform = pcoll_id.split(':', 1)[1]
+ original_gbk_transform = pcoll_id.split(b':', 1)[1]
transform_proto = pipeline_components.transforms[
original_gbk_transform]
input_pcoll = only_element(list(transform_proto.inputs.values()))
diff --git a/sdks/python/apache_beam/runners/runner.py
b/sdks/python/apache_beam/runners/runner.py
index 00ce3e6..b0bafa5 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -96,7 +96,7 @@ def create_runner(runner_name):
if '.' in runner_name:
module, runner = runner_name.rsplit('.', 1)
try:
- return getattr(__import__(module, {}, {}, [runner], -1), runner)()
+ return getattr(__import__(module, {}, {}, [runner], 0), runner)()
except ImportError:
if runner_name in _KNOWN_DATAFLOW_RUNNERS:
raise ImportError(
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index cfc0490..ae63660 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -110,7 +110,7 @@ class DataInputOperation(RunnerIOOperation):
self.receivers = [
operations.ConsumerSet(
self.counter_factory, self.name_context.step_name, 0,
- next(itervalues(consumers)), self.windowed_coder)]
+ next(iter(itervalues(consumers))), self.windowed_coder)]
def process(self, windowed_value):
self.output(windowed_value)
@@ -160,7 +160,7 @@ class StateBackedSideInputMap(object):
ptransform_id=self._transform_id,
side_input_id=self._tag,
window=self._target_window_coder.encode(target_window),
- key=''))
+ key=b''))
state_handler = self._state_handler
access_pattern = self._side_input_data.access_pattern
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index d64920f..ebae476 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -58,7 +58,7 @@ def worker_printable_fields(workerproto):
return ['%s=%s' % (name, value)
# _asdict is the only way and cannot subclass this generated class
# pylint: disable=protected-access
- for name, value in workerproto._asdict().iteritems()
+ for name, value in workerproto._asdict().items()
# want to output value 0 but not None nor []
if (value or value == 0)
and name not in
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index 4818859..41792cb 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -521,13 +521,7 @@ class PGBKCVOperation(Operation):
# simpler than for the DoFn's of ParDo.
fn, args, kwargs = pickler.loads(self.spec.combine_fn)[:3]
self.combine_fn = curry_combine_fn(fn, args, kwargs)
- if (getattr(fn.add_input, 'im_func', None)
- is core.CombineFn.add_input.__func__):
- # Old versions of the SDK have CombineFns that don't implement add_input.
- self.combine_fn_add_input = (
- lambda a, e: self.combine_fn.add_inputs(a, [e]))
- else:
- self.combine_fn_add_input = self.combine_fn.add_input
+ self.combine_fn_add_input = self.combine_fn.add_input
# Optimization for the (known tiny accumulator, often wide keyspace)
# combine functions.
# TODO(b/36567833): Bound by in-memory size rather than key count.
diff --git a/sdks/python/apache_beam/transforms/ptransform.py
b/sdks/python/apache_beam/transforms/ptransform.py
index 7a53fbe..4a89cbf 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -550,6 +550,7 @@ class PTransform(WithTypeHints, HasDisplayData):
urn=urn,
payload=typed_param.SerializeToString()
if isinstance(typed_param, message.Message)
+ else typed_param.encode('utf-8') if isinstance(typed_param, str)
else typed_param)
@classmethod
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index eb020c4..9cdfcc6 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -57,7 +57,7 @@ commands =
setenv =
BEAM_EXPERIMENTAL_PY3=1
modules =
-
apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability
+
apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test
commands =
python --version
pip --version