shunping commented on issue #32930: URL: https://github.com/apache/beam/issues/32930#issuecomment-2803609322
I took a closer look of [`org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2`](https://github.com/apache/beam/blob/release-2.64/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java#L382) and found it is a lot more trickier than the former one. @lostluck: please feel free to correct me if I misunderstand anything below. To recap, this java pipeline has a `GroupbyKey` followed by a `Flatten`. The output coder of `Flatten` is `SerializableCoder`, while the input coder for `GroupbyKey` is a standard KV coder. When we handle `Flatten` in prism, we overwrite its the input pcollection coders with the flatten coder. This works in most case: - If the input pcollection is NOT from a Runner transform, the sdk worker is supposed to give us the encoded bytes with the newly specified coder. - If the input pcollection is from a Runner transform, then we have the following scenarios: - Flatten: We already fixed that by #34582 and #34602 - Impulse: The output type is byte. Overwriting the coder of Impulse seems to have no effect. - GroupbyKey: This is the main problem I am going to talk about below. --- GroupbyKey is a Runner transform. When prism handles it, it collects encoded bytes from the input pcollection, decodes, aggregates and put them into the output pcollection. The coder of GroupByKey has no use, because the coder is always `Coder[K, Iterable<V>]`, while K and V is from the input. The error we see while running the test suggests that we try to decode the bytes with `SerializableCoder` but the actual byte stream is not. If I understand it correctly, the actual coder of Flatten output is directly from the GroupbyKey output which is encoded with coder `Coder[K, Iterable<V>]`. ``` java.io.StreamCorruptedException: invalid stream header: 206E6F74 at java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:936) at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:375) at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:199) at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:57) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:185) at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:543) ``` We cannot simply propagate Flatten coder to the upstream of GroupByKey because, we do not know what's inside that `SerializableCoder`. It will be great if we can add an identity mapping on the SDK side between GroupByKey and Flatten, which takes K,Iterable<V> and emits bytes encoded by `SerializableCoder`, though I am not sure if this is practical. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org