shunping opened a new pull request, #35862:
URL: https://github.com/apache/beam/pull/35862

   We received an internal report (internal bug id: 430560535) that cogbk not 
honoring custom coders.
   
   Below is the code to reproduce.
   ```Python
   """Test Beam's use of coders for keys in CoGroupByKey."""
   import apache_beam as beam
   from apache_beam.options import pipeline_options
   
   
   class _Unpicklable:
   
     def __init__(self, value):
       self.value = value
   
     def __getstate__(self):
       raise NotImplementedError()
   
     def __setstate__(self, state):
       raise NotImplementedError()
   
     def __repr__(self):
       return f"Unpicklable({self.value})"
   
   
   class _UnpicklableCoder(beam.coders.Coder):
     """."""
   
     def encode(self, value):
       return str(value.value).encode()
   
     def decode(self, encoded):
       return _Unpicklable(int(encoded.decode()))
   
     def to_type_hint(self):
       return _Unpicklable
   
     def is_deterministic(self):
       return True
   
   
   beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder)
   
   
   def pipeline_fn(root):
     values = [_Unpicklable(i) for i in range(5)]
     xs = root | beam.Create(values) | beam.WithKeys(lambda x: x)
     return (
         {'x': xs}
         | beam.CoGroupByKey()
         | beam.FlatMapTuple(lambda k, tagged: (k.value, tagged['x'][0].value * 
2))
         | beam.LogElements()
     )
   
   
   def main():
     options = pipeline_options.PipelineOptions(
         runner='DirectRunner', direct_num_workers=1, 
type_check_additional='all'
     )
     with beam.Pipeline(options=options) as pipeline:
       _ = pipeline_fn(pipeline)
   
   if __name__ == '__main__':
     main()
   ```
   
   Running this code will result in a "NotImplementedError", because pickled 
coder rather than the registered coder is used.
   
   My current PR can fix this problem by propagating type hints correctly.
   
   
   Related PR: #33932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to