shunping opened a new issue, #36387:
URL: https://github.com/apache/beam/issues/36387
### What happened?
Two of the GroupIntoBatches tests are excluded in the prism Java VR test
suite.
- 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode'
-
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInFixedWindow'
The stacktrace is shown below:
```
bundle inst009 stage-001 failed:org.apache.beam.sdk.coders.CoderException:
java.io.EOFException: reached end of stream after reading 7 bytes; 69 bytes
expected
at
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:104)
at
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:84)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:37)
at
org.apache.beam.sdk.values.WindowedValues$FullWindowedValueCoder.decode(WindowedValues.java:847)
at
org.apache.beam.sdk.values.WindowedValues$FullWindowedValueCoder.decode(WindowedValues.java:838)
at
org.apache.beam.sdk.values.WindowedValues$FullWindowedValueCoder.decode(WindowedValues.java:784)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:232)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:186)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:542)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
at
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.EOFException: reached end of stream after reading 7
bytes; 69 bytes expected
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:802)
at
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:784)
at
org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:60)
at
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100)
... 17 more
at
org.apache.beam.runners.prism.TestPrismRunner.run(TestPrismRunner.java:72)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
at
org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:442)
at
org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:381)
at
org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode(GroupIntoBatchesTest.java:438)
```
After investigation, I found that it seems to be a stage fusion problem in
prism.
The input pcollection (after windowing) is sent to two downstream branches:
- a DoFn that returns nothing
https://github.com/apache/beam/blob/5485467f230100e7ac2c0b50bda72b5e38ed9826/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java#L374
- the GroupIntoBatches transform
-
https://github.com/apache/beam/blob/5485467f230100e7ac2c0b50bda72b5e38ed9826/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java#L388
However, in the graph after stage fusion, the DoFn and the windowing are
fused in one stage and the windowing output becomes the output of this stage.
```
G stage-001:
* environment: beam:env:external:v1
S transforms : 2 []string
0: Window.Into()/Window.Assign
1: ParDo(Anonymous)/ParMultiDo(Anonymous)
...
G stage-008:
* environment: beam:env:external:v1
S transforms : 6 []string
0:
GroupIntoBatches/ParDo(GroupIntoBatches)/ParMultiDo(GroupIntoBatches)
1: eCount-elements-in-windows-after-applying-GroupIntoBatches_lift
2: PAssert$1/GroupGlobally/Window.Into()/Window.Assign
3:
PAssert$1/GroupGlobally/GatherAllOutputs/Reify.Window/ParDo(Anonymous)/ParMultiDo(Anonymous)
4:
PAssert$1/GroupGlobally/GatherAllOutputs/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
5: PAssert$1/GroupGlobally/GatherAllOutputs/Window.Into()/Window.Assign
...
DEBUG em.AddStage
ID : stage-001
S inputs : 1 []string
0: TestStream.out
S sides : 0 []engine.LinkID
S outputs: 1 []string
0: Window.Into()/Window.Assign.out
...
DEBUG em.AddStage
ID : stage-008
S inputs : 1 []string
0: Window.Into()/Window.Assign.out
S sides : 0 []engine.LinkID
S outputs: 2 []string
0:
PAssert$1/GroupGlobally/GatherAllOutputs/Window.Into()/Window.Assign.out
1: nCount-elements-in-windows-after-applying-GroupIntoBatches_lifted
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]