This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a3883f3 Also test registration of standard coders.
     new bc872dc  Merge pull request #8236 Test registration of standard coders.
8a3883f3 is described below

commit 8a3883f387407d71a9165b5eb00d8aa9c8c660d3
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Apr 5 13:55:49 2019 +0200

    Also test registration of standard coders.
---
 .../apache_beam/coders/standard_coders_test.py     | 28 ++++++++++------------
 1 file changed, 12 insertions(+), 16 deletions(-)

diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py 
b/sdks/python/apache_beam/coders/standard_coders_test.py
index 36f1d89..79c06f4 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -30,7 +30,8 @@ from builtins import map
 import yaml
 
 from apache_beam.coders import coder_impl
-from apache_beam.coders import coders
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners import pipeline_context
 from apache_beam.transforms import window
 from apache_beam.transforms.window import IntervalWindow
 from apache_beam.utils import windowed_value
@@ -55,19 +56,6 @@ def _load_test_cases(test_yaml):
 
 class StandardCodersTest(unittest.TestCase):
 
-  _urn_to_coder_class = {
-      'beam:coder:bytes:v1': coders.BytesCoder,
-      'beam:coder:varint:v1': coders.VarIntCoder,
-      'beam:coder:kv:v1': lambda k, v: coders.TupleCoder((k, v)),
-      'beam:coder:interval_window:v1': coders.IntervalWindowCoder,
-      'beam:coder:iterable:v1': lambda t: coders.IterableCoder(t),
-      'beam:coder:global_window:v1': coders.GlobalWindowCoder,
-      'beam:coder:windowed_value:v1':
-          lambda v, w: coders.WindowedValueCoder(v, w),
-      'beam:coder:timer:v1': coders._TimerCoder,
-      'beam:coder:double:v1': coders.FloatCoder,
-  }
-
   _urn_to_json_value_parser = {
       'beam:coder:bytes:v1': lambda x: x.encode('utf-8'),
       'beam:coder:varint:v1': lambda x: x,
@@ -128,8 +116,16 @@ class StandardCodersTest(unittest.TestCase):
                            value)
 
   def parse_coder(self, spec):
-    return self._urn_to_coder_class[spec['urn']](
-        *[self.parse_coder(c) for c in spec.get('components', ())])
+    context = pipeline_context.PipelineContext()
+    coder_id = str(hash(str(spec)))
+    component_ids = [context.coders.get_id(self.parse_coder(c))
+                     for c in spec.get('components', ())]
+    context.coders.put_proto(coder_id, beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.SdkFunctionSpec(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=spec['urn'], payload=spec.get('payload'))),
+        component_coder_ids=component_ids))
+    return context.coders.get_by_id(coder_id)
 
   def json_value_parser(self, coder_spec):
     component_parsers = [

Reply via email to