[
https://issues.apache.org/jira/browse/BEAM-5321?focusedWorklogId=172901&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172901
]
ASF GitHub Bot logged work on BEAM-5321:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Dec/18 00:26
Start Date: 07/Dec/18 00:26
Worklog Time Spent: 10m
Work Description: aaltay closed pull request #7104: [BEAM-5321] Port
transforms package to Python 3
URL: https://github.com/apache/beam/pull/7104
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 95ea03f2ba28..ffb43317e221 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -668,7 +668,7 @@ def test_chained_ptransforms(self):
def test_apply_to_list(self):
self.assertCountEqual(
[1, 2, 3], [0, 1, 2] | 'AddOne' >> beam.Map(lambda x: x + 1))
- self.assertItemsEqual([1],
+ self.assertCountEqual([1],
[0, 1, 2] | 'Odd' >> beam.Filter(lambda x: x % 2))
self.assertCountEqual([1, 2, 100, 3],
([1, 2, 3], [100]) | beam.Flatten())
@@ -947,7 +947,7 @@ def process(self, element, prefix):
| 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))
self.assertEqual("Type hint violation for 'Upper': "
- "requires <type 'str'> but got <type 'int'> for element",
+ "requires {} but got {} for element".format(str, int),
e.exception.args[0])
def test_do_fn_pipeline_runtime_type_check_satisfied(self):
@@ -982,7 +982,7 @@ def process(self, element, num):
self.p.run()
self.assertEqual("Type hint violation for 'Add': "
- "requires <type 'int'> but got <type 'str'> for element",
+ "requires {} but got {} for element".format(int, str),
e.exception.args[0])
def test_pardo_does_not_type_check_using_type_hint_decorators(self):
@@ -999,7 +999,7 @@ def int_to_str(a):
| 'ToStr' >> beam.FlatMap(int_to_str))
self.assertEqual("Type hint violation for 'ToStr': "
- "requires <type 'int'> but got <type 'str'> for a",
+ "requires {} but got {} for a".format(int, str),
e.exception.args[0])
def test_pardo_properly_type_checks_using_type_hint_decorators(self):
@@ -1031,7 +1031,7 @@ def
test_pardo_does_not_type_check_using_type_hint_methods(self):
.with_input_types(str).with_output_types(str)))
self.assertEqual("Type hint violation for 'Upper': "
- "requires <type 'str'> but got <type 'int'> for x",
+ "requires {} but got {} for x".format(str, int),
e.exception.args[0])
def test_pardo_properly_type_checks_using_type_hint_methods(self):
@@ -1056,7 +1056,7 @@ def
test_map_does_not_type_check_using_type_hints_methods(self):
.with_input_types(str).with_output_types(str))
self.assertEqual("Type hint violation for 'Upper': "
- "requires <type 'str'> but got <type 'int'> for x",
+ "requires {} but got {} for x".format(str, int),
e.exception.args[0])
def test_map_properly_type_checks_using_type_hints_methods(self):
@@ -1082,7 +1082,7 @@ def upper(s):
| 'Upper' >> beam.Map(upper))
self.assertEqual("Type hint violation for 'Upper': "
- "requires <type 'str'> but got <type 'int'> for s",
+ "requires {} but got {} for s".format(str, int),
e.exception.args[0])
def test_map_properly_type_checks_using_type_hints_decorator(self):
@@ -1109,7 +1109,7 @@ def
test_filter_does_not_type_check_using_type_hints_method(self):
| 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
self.assertEqual("Type hint violation for 'Below 3': "
- "requires <type 'int'> but got <type 'str'> for x",
+ "requires {} but got {} for x".format(int, str),
e.exception.args[0])
def test_filter_type_checks_using_type_hints_method(self):
@@ -1134,7 +1134,7 @@ def more_than_half(a):
| 'Half' >> beam.Filter(more_than_half))
self.assertEqual("Type hint violation for 'Half': "
- "requires <type 'float'> but got <type 'int'> for a",
+ "requires {} but got {} for a".format(float, int),
e.exception.args[0])
def test_filter_type_checks_using_type_hints_decorator(self):
@@ -1183,7 +1183,7 @@ def test_group_by_key_only_does_not_type_check(self):
self.assertEqual("Input type hint violation at F: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
- "got <type 'int'>",
+ "got {}".format(int),
e.exception.args[0])
def test_group_by_does_not_type_check(self):
@@ -1260,8 +1260,8 @@ def int_to_string(x):
e.exception.args[0],
"Runtime type violation detected within ParDo(ToStr): "
"Type-hint for argument: 'x' violated. "
- "Expected an instance of <type 'int'>, "
- "instead found some_string, an instance of <type 'str'>.")
+ "Expected an instance of {}, "
+ "instead found some_string, an instance of {}.".format(int, str))
def test_run_time_type_checking_enabled_types_satisfied(self):
self.p._options.view_as(TypeOptions).pipeline_type_check = False
@@ -1351,8 +1351,8 @@ def
test_pipeline_runtime_checking_violation_simple_type_input(self):
e.exception.args[0],
"Runtime type violation detected within ParDo(ToInt): "
"Type-hint for argument: 'x' violated. "
- "Expected an instance of <type 'str'>, "
- "instead found 1, an instance of <type 'int'>.")
+ "Expected an instance of {}, "
+ "instead found 1, an instance of {}.".format(str, int))
def test_pipeline_runtime_checking_violation_composite_type_input(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1398,8 +1398,8 @@ def
test_pipeline_runtime_checking_violation_simple_type_output(self):
"Runtime type violation detected within "
"ParDo(ToInt): "
"According to type-hint expected output should be "
- "of type <type 'int'>. Instead, received '1.0', "
- "an instance of type <type 'float'>.")
+ "of type {}. Instead, received '1.0', "
+ "an instance of type {}.".format(int, float))
def test_pipeline_runtime_checking_violation_composite_type_output(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1441,8 +1441,8 @@ def add(a, b):
e.exception.args[0],
"Runtime type violation detected within ParDo(Add 1): "
"Type-hint for argument: 'b' violated. "
- "Expected an instance of <type 'int'>, "
- "instead found 1.0, an instance of <type 'float'>.")
+ "Expected an instance of {}, "
+ "instead found 1.0, an instance of {}.".format(int, float))
def
test_pipeline_runtime_checking_violation_with_side_inputs_via_method(self): #
pylint: disable=line-too-long
self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1460,8 +1460,8 @@ def
test_pipeline_runtime_checking_violation_with_side_inputs_via_method(self):
e.exception.args[0],
"Runtime type violation detected within ParDo(Add 1): "
"Type-hint for argument: 'one' violated. "
- "Expected an instance of <type 'int'>, "
- "instead found 1.0, an instance of <type 'float'>.")
+ "Expected an instance of {}, "
+ "instead found 1.0, an instance of {}.".format(int, float))
def test_combine_properly_pipeline_type_checks_using_decorator(self):
@with_output_types(int)
@@ -1491,7 +1491,7 @@ def bad_combine(a):
self.assertEqual(
"All functions for a Combine PTransform must accept a "
"single argument compatible with: Iterable[Any]. "
- "Instead a function with input type: <type 'int'> was received.",
+ "Instead a function with input type: {} was received.".format(int),
e.exception.args[0])
def test_combine_pipeline_type_propagation_using_decorators(self):
@@ -1550,7 +1550,7 @@ def iter_mul(ints):
"Runtime type violation detected within "
"Mul/CombinePerKey: "
"Type-hint for return type violated. "
- "Expected an instance of <type 'int'>, instead found")
+ "Expected an instance of {}, instead found".format(int))
def test_combine_pipeline_type_check_using_methods(self):
d = (self.p
@@ -1585,7 +1585,7 @@ def
test_combine_pipeline_type_check_violation_using_methods(self):
.with_input_types(str).with_output_types(str)))
self.assertEqual("Input type hint violation at SortJoin: "
- "expected <type 'str'>, got <type 'int'>",
+ "expected {}, got {}".format(str, int),
e.exception.args[0])
def test_combine_runtime_type_check_violation_using_methods(self):
@@ -1604,8 +1604,8 @@ def
test_combine_runtime_type_check_violation_using_methods(self):
"Runtime type violation detected within "
"ParDo(SortJoin/KeyWithVoid): "
"Type-hint for argument: 'v' violated. "
- "Expected an instance of <type 'str'>, "
- "instead found 0, an instance of <type 'int'>.")
+ "Expected an instance of {}, "
+ "instead found 0, an instance of {}.".format(str, int))
def test_combine_insufficient_type_hint_information(self):
self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
@@ -1638,11 +1638,18 @@ def test_mean_globally_pipeline_checking_violated(self):
| 'C' >> beam.Create(['test']).with_output_types(str)
| 'Mean' >> combine.Mean.Globally())
- self.assertEqual(
- "Type hint violation for 'CombinePerKey': "
- "requires Tuple[TypeVariable[K], Union[float, int, long]] "
- "but got Tuple[None, str] for element",
- e.exception.args[0])
+ if sys.version_info[0] >= 3:
+ expected_msg = \
+ "Type hint violation for 'CombinePerKey': " \
+ "requires Tuple[TypeVariable[K], Union[float, int]] " \
+ "but got Tuple[None, str] for element"
+ else:
+ expected_msg = \
+ "Type hint violation for 'CombinePerKey': " \
+ "requires Tuple[TypeVariable[K], Union[float, int, long]] " \
+ "but got Tuple[None, str] for element"
+
+ self.assertEqual(expected_msg, e.exception.args[0])
def test_mean_globally_runtime_checking_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1695,11 +1702,18 @@ def test_mean_per_key_pipeline_checking_violated(self):
| 'EvenMean' >> combine.Mean.PerKey())
self.p.run()
- self.assertEqual(
- "Type hint violation for 'CombinePerKey(MeanCombineFn)': "
- "requires Tuple[TypeVariable[K], Union[float, int, long]] "
- "but got Tuple[str, str] for element",
- e.exception.args[0])
+ if sys.version_info[0] >= 3:
+ expected_msg = \
+ "Type hint violation for 'CombinePerKey(MeanCombineFn)': " \
+ "requires Tuple[TypeVariable[K], Union[float, int]] " \
+ "but got Tuple[str, str] for element"
+ else:
+ expected_msg = \
+ "Type hint violation for 'CombinePerKey(MeanCombineFn)': " \
+ "requires Tuple[TypeVariable[K], Union[float, int, long]] " \
+ "but got Tuple[str, str] for element"
+
+ self.assertEqual(expected_msg, e.exception.args[0])
def test_mean_per_key_runtime_checking_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1726,14 +1740,24 @@ def test_mean_per_key_runtime_checking_violated(self):
| 'OddMean' >> combine.Mean.PerKey())
self.p.run()
- self.assertStartswith(
- e.exception.args[0],
- "Runtime type violation detected within "
- "OddMean/CombinePerKey(MeanCombineFn): "
- "Type-hint for argument: 'element' violated: "
- "Union[float, int, long] type-constraint violated. "
- "Expected an instance of one of: ('float', 'int', 'long'), "
- "received str instead")
+ if sys.version_info[0] >= 3:
+ expected_msg = \
+ "Runtime type violation detected within " \
+ "OddMean/CombinePerKey(MeanCombineFn): " \
+ "Type-hint for argument: 'element' violated: " \
+ "Union[float, int] type-constraint violated. " \
+ "Expected an instance of one of: ('float', 'int'), " \
+ "received str instead"
+ else:
+ expected_msg = \
+ "Runtime type violation detected within " \
+ "OddMean/CombinePerKey(MeanCombineFn): " \
+ "Type-hint for argument: 'element' violated: " \
+ "Union[float, int, long] type-constraint violated. " \
+ "Expected an instance of one of: ('float', 'int', 'long'), " \
+ "received str instead"
+
+ self.assertStartswith(e.exception.args[0], expected_msg)
def test_count_globally_pipeline_type_checking_satisfied(self):
d = (self.p
@@ -1775,7 +1799,7 @@ def
test_count_perkey_pipeline_type_checking_violated(self):
self.assertEqual(
"Type hint violation for 'CombinePerKey(CountCombineFn)': "
"requires Tuple[TypeVariable[K], Any] "
- "but got <type 'int'> for element",
+ "but got {} for element".format(int),
e.exception.args[0])
def test_count_perkey_runtime_type_checking_satisfied(self):
@@ -1856,7 +1880,7 @@ def test_per_key_pipeline_checking_violated(self):
self.assertEqual(
"Type hint violation for 'CombinePerKey(TopCombineFn)': "
"requires Tuple[TypeVariable[K], TypeVariable[T]] "
- "but got <type 'int'> for element",
+ "but got {} for element".format(int),
e.exception.args[0])
def test_per_key_pipeline_checking_satisfied(self):
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py
b/sdks/python/apache_beam/transforms/userstate_test.py
index d3d592f7c6f5..cb91cc0d24cb 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -29,6 +29,7 @@
from apache_beam.runners.common import DoFnSignature
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.util import equal_to
from apache_beam.transforms import userstate
from apache_beam.transforms.combiners import ToListCombineFn
from apache_beam.transforms.combiners import TopCombineFn
@@ -337,13 +338,13 @@ class SimpleTestStatefulDoFn(DoFn):
def process(self, element, buffer=DoFn.StateParam(BUFFER_STATE),
timer1=DoFn.TimerParam(EXPIRY_TIMER)):
unused_key, value = element
- buffer.add('A' + str(value))
+ buffer.add(b'A' + str(value).encode('latin1'))
timer1.set(20)
@on_timer(EXPIRY_TIMER)
def expiry_callback(self, buffer=DoFn.StateParam(BUFFER_STATE),
timer=DoFn.TimerParam(EXPIRY_TIMER)):
- yield ''.join(sorted(buffer.read()))
+ yield b''.join(sorted(buffer.read()))
with TestPipeline() as p:
test_stream = (TestStream()
@@ -362,7 +363,7 @@ def expiry_callback(self,
buffer=DoFn.StateParam(BUFFER_STATE),
# fire after the watermark passes time 20, and another time after element
# 4, since the timer issued at that point should fire immediately.
self.assertEqual(
- ['A1A2A3', 'A1A2A3A4'],
+ [b'A1A2A3', b'A1A2A3A4'],
StatefulDoFnOnDirectRunnerTest.all_records)
def test_stateful_dofn_nonkeyed_input(self):
@@ -495,7 +496,7 @@ def process(self, element,
state=DoFn.StateParam(BUFFER_STATE),
state.add(value)
timer.set(100)
else:
- yield 'Record<%s,%s,%s>' % (key, existing_values[0], value)
+ yield b'Record<%s,%s,%s>' % (key, existing_values[0], value)
state.clear()
timer.clear()
@@ -504,29 +505,28 @@ def expiry_callback(self,
state=DoFn.StateParam(BUFFER_STATE)):
buffered = list(state.read())
assert len(buffered) == 1, buffered
state.clear()
- yield 'Unmatched<%s>' % (buffered[0],)
+ yield b'Unmatched<%s>' % (buffered[0],)
with TestPipeline() as p:
test_stream = (TestStream()
.advance_watermark_to(10)
- .add_elements([('A', 'a'), ('B', 'b')])
- .add_elements([('A', 'aa'), ('C', 'c')])
+ .add_elements([(b'A', b'a'), (b'B', b'b')])
+ .add_elements([(b'A', b'aa'), (b'C', b'c')])
.advance_watermark_to(25)
- .add_elements([('A', 'aaa'), ('B', 'bb')])
- .add_elements([('D', 'd'), ('D', 'dd'), ('D', 'ddd'),
- ('D', 'dddd')])
+ .add_elements([(b'A', b'aaa'), (b'B', b'bb')])
+ .add_elements([(b'D', b'd'), (b'D', b'dd'), (b'D',
b'ddd'),
+ (b'D', b'dddd')])
.advance_watermark_to(125)
- .add_elements([('C', 'cc')]))
+ .add_elements([(b'C', b'cc')]))
(p
| test_stream
| beam.ParDo(HashJoinStatefulDoFn())
| beam.ParDo(self.record_dofn()))
- self.assertEqual(
- ['Record<A,a,aa>', 'Record<B,b,bb>', 'Record<D,d,dd>',
- 'Record<D,ddd,dddd>', 'Unmatched<aaa>', 'Unmatched<c>',
- 'Unmatched<cc>'],
- sorted(StatefulDoFnOnDirectRunnerTest.all_records))
+ equal_to(StatefulDoFnOnDirectRunnerTest.all_records)(
+ [b'Record<A,a,aa>', b'Record<B,b,bb>', b'Record<D,d,dd>',
+ b'Record<D,ddd,dddd>', b'Unmatched<aaa>', b'Unmatched<c>',
+ b'Unmatched<cc>'])
if __name__ == '__main__':
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index a4aec5199c98..9eb6c30214e6 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -102,16 +102,17 @@ def get_version():
cythonize = lambda *args, **kwargs: []
REQUIRED_PACKAGES_PY2_ONLY = [
- 'avro>=1.8.1,<2.0.0'
+ 'avro>=1.8.1,<2.0.0',
+ 'dill>=0.2.6,<=0.2.8.2',
]
REQUIRED_PACKAGES_PY3_ONLY = [
- 'avro-python3>=1.8.1,<2.0.0'
+ 'avro-python3>=1.8.1,<2.0.0',
+ 'dill==0.2.9.dev0'
]
REQUIRED_PACKAGES = [
'crcmod>=1.7,<2.0',
- 'dill>=0.2.6,<=0.2.8.2',
'fastavro>=0.21.4,<0.22',
'grpcio>=1.8,<2',
'hdfs>=2.1.0,<3.0.0',
@@ -148,8 +149,13 @@ def get_version():
if sys.version_info[0] == 2:
REQUIRED_PACKAGES = REQUIRED_PACKAGES + REQUIRED_PACKAGES_PY2_ONLY
+ DEPENDENCY_LINKS = []
elif sys.version_info[0] >= 3:
REQUIRED_PACKAGES = REQUIRED_PACKAGES + REQUIRED_PACKAGES_PY3_ONLY
+ # TODO(BEAM-6135): Revert when new dill version released
+ DEPENDENCY_LINKS = ['git+https://github.com/uqfoundation/dill.git'
+ '@7a73fbe3d6aa445f93f58f266687b7315d14a3ac'
+ '#egg=dill-0.2.9.dev0']
# We must generate protos after setup_requires are installed.
@@ -198,6 +204,7 @@ def run(self):
'apache_beam/utils/windowed_value.py',
]),
install_requires=REQUIRED_PACKAGES,
+ dependency_links=DEPENDENCY_LINKS,
python_requires=python_requires,
test_suite='nose.collector',
tests_require=REQUIRED_TEST_PACKAGES,
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 09c794b16db8..a357a2f12111 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -41,7 +41,7 @@ deps =
# Otherwise we get "OSError: [Errno 2] No such file or directory" errors.
# Source:
# https://github.com/tox-dev/tox/issues/123#issuecomment-284714629
-install_command = {envbindir}/python {envbindir}/pip install {opts} {packages}
+install_command = {envbindir}/python {envbindir}/pip install
--process-dependency-links {opts} {packages}
list_dependencies_command = {envbindir}/python {envbindir}/pip freeze
[testenv:py27]
@@ -58,7 +58,7 @@ setenv =
BEAM_EXPERIMENTAL_PY3=1
RUN_SKIPPED_PY3_TESTS=0
modules =
-
apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.gcsfilesystem_test,apache_beam.io.gcp.gcsio_test,apache_beam.io.gcp.pubsub_integration_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.internal,apache_beam.io.filesystem_test,apache_beam.io.filesystems,apache_beam.io.range_trackers_test,apache_beam.io.sources_test
+
apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.gcsfilesystem_test,apache_beam.io.gcp.gcsio_test,apache_beam.io.gcp.pubsub_integration_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.internal,apache_beam.io.filesystem_test,apache_beam.io.filesystems,apache_beam.io.range_trackers_test,apache_beam.io.sources_test,apache_beam.transforms
commands =
python --version
pip --version
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 172901)
Time Spent: 3h (was: 2h 50m)
> Finish Python 3 porting for transforms module
> ---------------------------------------------
>
> Key: BEAM-5321
> URL: https://issues.apache.org/jira/browse/BEAM-5321
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Robbe
> Assignee: Robbe
> Priority: Major
> Fix For: Not applicable
>
> Time Spent: 3h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)