This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8a3883f3 Also test registration of standard coders.
new bc872dc Merge pull request #8236 Test registration of standard coders.
8a3883f3 is described below
commit 8a3883f387407d71a9165b5eb00d8aa9c8c660d3
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Apr 5 13:55:49 2019 +0200
Also test registration of standard coders.
---
.../apache_beam/coders/standard_coders_test.py | 28 ++++++++++------------
1 file changed, 12 insertions(+), 16 deletions(-)
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py
b/sdks/python/apache_beam/coders/standard_coders_test.py
index 36f1d89..79c06f4 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -30,7 +30,8 @@ from builtins import map
import yaml
from apache_beam.coders import coder_impl
-from apache_beam.coders import coders
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners import pipeline_context
from apache_beam.transforms import window
from apache_beam.transforms.window import IntervalWindow
from apache_beam.utils import windowed_value
@@ -55,19 +56,6 @@ def _load_test_cases(test_yaml):
class StandardCodersTest(unittest.TestCase):
- _urn_to_coder_class = {
- 'beam:coder:bytes:v1': coders.BytesCoder,
- 'beam:coder:varint:v1': coders.VarIntCoder,
- 'beam:coder:kv:v1': lambda k, v: coders.TupleCoder((k, v)),
- 'beam:coder:interval_window:v1': coders.IntervalWindowCoder,
- 'beam:coder:iterable:v1': lambda t: coders.IterableCoder(t),
- 'beam:coder:global_window:v1': coders.GlobalWindowCoder,
- 'beam:coder:windowed_value:v1':
- lambda v, w: coders.WindowedValueCoder(v, w),
- 'beam:coder:timer:v1': coders._TimerCoder,
- 'beam:coder:double:v1': coders.FloatCoder,
- }
-
_urn_to_json_value_parser = {
'beam:coder:bytes:v1': lambda x: x.encode('utf-8'),
'beam:coder:varint:v1': lambda x: x,
@@ -128,8 +116,16 @@ class StandardCodersTest(unittest.TestCase):
value)
def parse_coder(self, spec):
- return self._urn_to_coder_class[spec['urn']](
- *[self.parse_coder(c) for c in spec.get('components', ())])
+ context = pipeline_context.PipelineContext()
+ coder_id = str(hash(str(spec)))
+ component_ids = [context.coders.get_id(self.parse_coder(c))
+ for c in spec.get('components', ())]
+ context.coders.put_proto(coder_id, beam_runner_api_pb2.Coder(
+ spec=beam_runner_api_pb2.SdkFunctionSpec(
+ spec=beam_runner_api_pb2.FunctionSpec(
+ urn=spec['urn'], payload=spec.get('payload'))),
+ component_coder_ids=component_ids))
+ return context.coders.get_by_id(coder_id)
def json_value_parser(self, coder_spec):
component_parsers = [