shunping opened a new pull request, #35862: URL: https://github.com/apache/beam/pull/35862
We received an internal report (internal bug id: 430560535) that cogbk not honoring custom coders. Below is the code to reproduce. ```Python """Test Beam's use of coders for keys in CoGroupByKey.""" import apache_beam as beam from apache_beam.options import pipeline_options class _Unpicklable: def __init__(self, value): self.value = value def __getstate__(self): raise NotImplementedError() def __setstate__(self, state): raise NotImplementedError() def __repr__(self): return f"Unpicklable({self.value})" class _UnpicklableCoder(beam.coders.Coder): """.""" def encode(self, value): return str(value.value).encode() def decode(self, encoded): return _Unpicklable(int(encoded.decode())) def to_type_hint(self): return _Unpicklable def is_deterministic(self): return True beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder) def pipeline_fn(root): values = [_Unpicklable(i) for i in range(5)] xs = root | beam.Create(values) | beam.WithKeys(lambda x: x) return ( {'x': xs} | beam.CoGroupByKey() | beam.FlatMapTuple(lambda k, tagged: (k.value, tagged['x'][0].value * 2)) | beam.LogElements() ) def main(): options = pipeline_options.PipelineOptions( runner='DirectRunner', direct_num_workers=1, type_check_additional='all' ) with beam.Pipeline(options=options) as pipeline: _ = pipeline_fn(pipeline) if __name__ == '__main__': main() ``` Running this code will result in a "NotImplementedError", because pickled coder rather than the registered coder is used. My current PR can fix this problem by propagating type hints correctly. Related PR: #33932 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org