shunping commented on issue #32930:
URL: https://github.com/apache/beam/issues/32930#issuecomment-2803609322

   I took a closer look of 
[`org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2`](https://github.com/apache/beam/blob/release-2.64/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java#L382)
 and found it is a lot more trickier than the former one.
   
   @lostluck: please feel free to correct me if I misunderstand anything below. 
 
   
   
   To recap, this java pipeline has a `GroupbyKey` followed by a `Flatten`. The 
output coder of `Flatten` is `SerializableCoder`, while the input coder for 
`GroupbyKey` is a standard KV coder.
   
   
   When we handle `Flatten` in prism, we overwrite its the input pcollection 
coders with the flatten coder. This works in most case: 
   - If the input pcollection is NOT from a Runner transform, the sdk worker is 
supposed to give us the encoded bytes with the newly specified coder. 
   - If the input pcollection is from a Runner transform, then we have the 
following scenarios:
     - Flatten: We already fixed that by #34582 and #34602 
     - Impulse: The output type is byte. Overwriting the coder of Impulse seems 
to have no effect.
     - GroupbyKey: This is the main problem I am going to talk about below.
   
   ---
   
   GroupbyKey is a Runner transform. When prism handles it, it collects encoded 
bytes from the input pcollection, decodes, aggregates and put them into the 
output pcollection. The coder of GroupByKey has no use, because the coder is 
always `Coder[K, Iterable<V>]`, while K and V is from the input. 
   
   The error we see while running the test suggests that we try to decode the 
bytes with `SerializableCoder` but the actual byte stream is not. If I 
understand it correctly, the actual coder of Flatten output is directly from 
the GroupbyKey output which is encoded with coder `Coder[K, Iterable<V>]`.
   
   ```
   java.io.StreamCorruptedException: invalid stream header: 206E6F74
        at 
java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:936)
        at 
java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:375)
        at 
org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:199)
        at 
org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:57)
        at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
        at 
org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64)
        at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:185)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:543)
   ```
   
   We cannot simply propagate Flatten coder to the upstream of GroupByKey 
because, we do not know what's inside that `SerializableCoder`. It will be 
great if we can add an identity mapping on the SDK side between GroupByKey and 
Flatten, which takes K,Iterable<V> and emits bytes encoded by 
`SerializableCoder`, though I am not sure if this is practical.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to