[
https://issues.apache.org/jira/browse/BEAM-5324?focusedWorklogId=146101&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146101
]
ASF GitHub Bot logged work on BEAM-5324:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Sep/18 19:11
Start Date: 20/Sep/18 19:11
Worklog Time Spent: 10m
Work Description: aaltay closed pull request #6424: [BEAM-5324] Partially
port unpackaged modules to Python 3
URL: https://github.com/apache/beam/pull/6424
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/pipeline_test.py
b/sdks/python/apache_beam/pipeline_test.py
index 9d4e79a9a9a..6397820e6e9 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -22,6 +22,7 @@
import copy
import logging
import platform
+import sys
import unittest
from builtins import object
from builtins import range
@@ -390,6 +391,8 @@ def process(self, element):
assert_that(pcoll, equal_to([11, 12]))
pipeline.run()
+ @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+ 'fixed on Python 3')
def test_side_input_no_tag(self):
class TestDoFn(DoFn):
def process(self, element, prefix, suffix):
@@ -405,6 +408,8 @@ def process(self, element, prefix, suffix):
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()
+ @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+ 'fixed on Python 3')
def test_side_input_tagged(self):
class TestDoFn(DoFn):
def process(self, element, prefix, suffix=DoFn.SideInputParam):
@@ -520,13 +525,14 @@ def test_dir(self):
options = Breakfast()
self.assertEquals(
set(['from_dictionary', 'get_all_options', 'slices', 'style',
- 'view_as', 'display_data', 'next']),
- set([attr for attr in dir(options) if not attr.startswith('_')]))
+ 'view_as', 'display_data']),
+ set([attr for attr in dir(options) if not attr.startswith('_') and
+ attr != 'next']))
self.assertEquals(
set(['from_dictionary', 'get_all_options', 'style', 'view_as',
- 'display_data', 'next']),
+ 'display_data']),
set([attr for attr in dir(options.view_as(Eggs))
- if not attr.startswith('_')]))
+ if not attr.startswith('_') and attr != 'next']))
class RunnerApiTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 9e39ca82c84..f6e0d558346 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -541,7 +541,7 @@ def expand_gbk(stages):
pipeline_components.pcollections[pcoll_id],
pipeline_components)
# This is used later to correlate the read and write.
- param = str("group:%s" % stage.name)
+ param = str("group:%s" % stage.name).encode('utf-8')
if stage.name not in pipeline_components.transforms:
pipeline_components.transforms[stage.name].CopyFrom(transform)
gbk_write = Stage(
@@ -583,7 +583,7 @@ def sink_flattens(stages):
transform = stage.transforms[0]
if transform.spec.urn == common_urns.primitives.FLATTEN.urn:
# This is used later to correlate the read and writes.
- param = str("materialize:%s" % transform.unique_name)
+ param = str("materialize:%s" % transform.unique_name).encode('utf-8')
output_pcoll_id, = list(transform.outputs.values())
output_coder_id = pcollections[output_pcoll_id].coder_id
flatten_writes = []
@@ -731,7 +731,7 @@ def fuse(producer, consumer):
# Now try to fuse away all pcollections.
for pcoll, producer in producers_by_pcoll.items():
- pcoll_as_param = str("materialize:%s" % pcoll)
+ pcoll_as_param = str("materialize:%s" % pcoll).encode('utf-8')
write_pcoll = None
for consumer in consumers_by_pcoll[pcoll]:
producer = replacement(producer)
@@ -929,7 +929,7 @@ def extract_endpoints(stage):
# Store the required side inputs into state.
for (transform_id, tag), (pcoll_id, si) in data_side_input.items():
- actual_pcoll_id = pcoll_id[len("materialize:"):]
+ actual_pcoll_id = pcoll_id[len(b"materialize:"):]
value_coder = context.coders[safe_coders[
pipeline_components.pcollections[actual_pcoll_id].coder_id]]
elements_by_window = _WindowGroupingBuffer(si, value_coder)
@@ -945,14 +945,14 @@ def extract_endpoints(stage):
controller.state_handler.blocking_append(state_key, elements_data)
def get_buffer(pcoll_id):
- if pcoll_id.startswith('materialize:'):
+ if pcoll_id.startswith(b'materialize:'):
if pcoll_id not in pcoll_buffers:
# Just store the data chunks for replay.
pcoll_buffers[pcoll_id] = list()
- elif pcoll_id.startswith('group:'):
+ elif pcoll_id.startswith(b'group:'):
# This is a grouping write, create a grouping buffer if needed.
if pcoll_id not in pcoll_buffers:
- original_gbk_transform = pcoll_id.split(':', 1)[1]
+ original_gbk_transform = pcoll_id.split(b':', 1)[1]
transform_proto = pipeline_components.transforms[
original_gbk_transform]
input_pcoll = only_element(list(transform_proto.inputs.values()))
diff --git a/sdks/python/apache_beam/runners/runner.py
b/sdks/python/apache_beam/runners/runner.py
index 00ce3e6429b..b0bafa55d68 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -96,7 +96,7 @@ def create_runner(runner_name):
if '.' in runner_name:
module, runner = runner_name.rsplit('.', 1)
try:
- return getattr(__import__(module, {}, {}, [runner], -1), runner)()
+ return getattr(__import__(module, {}, {}, [runner], 0), runner)()
except ImportError:
if runner_name in _KNOWN_DATAFLOW_RUNNERS:
raise ImportError(
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 4b7e9cda1bf..2ec670f3499 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -108,7 +108,7 @@ def __init__(self, operation_name, step_name, consumers,
counter_factory,
self.receivers = [
operations.ConsumerSet(
self.counter_factory, self.name_context.step_name, 0,
- next(itervalues(consumers)), self.windowed_coder)]
+ next(iter(itervalues(consumers))), self.windowed_coder)]
def process(self, windowed_value):
self.output(windowed_value)
@@ -141,7 +141,7 @@ def __getitem__(self, window):
ptransform_id=self._transform_id,
side_input_id=self._tag,
window=self._target_window_coder.encode(target_window),
- key=''))
+ key=b''))
state_handler = self._state_handler
access_pattern = self._side_input_data.access_pattern
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index d64920f18fe..ebae476e9bc 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -58,7 +58,7 @@ def worker_printable_fields(workerproto):
return ['%s=%s' % (name, value)
# _asdict is the only way and cannot subclass this generated class
# pylint: disable=protected-access
- for name, value in workerproto._asdict().iteritems()
+ for name, value in workerproto._asdict().items()
# want to output value 0 but not None nor []
if (value or value == 0)
and name not in
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index 0488fe928d3..dc2fe2b4c6e 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -495,13 +495,7 @@ def __init__(self, name_context, spec, counter_factory,
state_sampler):
# simpler than for the DoFn's of ParDo.
fn, args, kwargs = pickler.loads(self.spec.combine_fn)[:3]
self.combine_fn = curry_combine_fn(fn, args, kwargs)
- if (getattr(fn.add_input, 'im_func', None)
- is core.CombineFn.add_input.__func__):
- # Old versions of the SDK have CombineFns that don't implement add_input.
- self.combine_fn_add_input = (
- lambda a, e: self.combine_fn.add_inputs(a, [e]))
- else:
- self.combine_fn_add_input = self.combine_fn.add_input
+ self.combine_fn_add_input = self.combine_fn.add_input
# Optimization for the (known tiny accumulator, often wide keyspace)
# combine functions.
# TODO(b/36567833): Bound by in-memory size rather than key count.
diff --git a/sdks/python/apache_beam/transforms/ptransform.py
b/sdks/python/apache_beam/transforms/ptransform.py
index 7a53fbe25b0..4a89cbf8419 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -550,6 +550,7 @@ def to_runner_api(self, context, has_parts=False):
urn=urn,
payload=typed_param.SerializeToString()
if isinstance(typed_param, message.Message)
+ else typed_param.encode('utf-8') if isinstance(typed_param, str)
else typed_param)
@classmethod
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index eb020c4d16a..9cdfcc62c02 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -57,7 +57,7 @@ commands =
setenv =
BEAM_EXPERIMENTAL_PY3=1
modules =
-
apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability
+
apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test
commands =
python --version
pip --version
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 146101)
Time Spent: 2h 40m (was: 2.5h)
> Finish Python 3 porting for unpackaged files
> --------------------------------------------
>
> Key: BEAM-5324
> URL: https://issues.apache.org/jira/browse/BEAM-5324
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Robbe
> Assignee: Robbe
> Priority: Major
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)