Abacn commented on issue #29429: URL: https://github.com/apache/beam/issues/29429#issuecomment-1814712627
Comparing the log it seems the pipeline is executed differently. Before: ``` INFO: Source: Query0.ReadUnbounded -> Flat Map -> Query0/Query0.Monitor/ParMultiDo(Monitor) -> Query0/Query0.Snoop/ParMultiDo(Anonymous) -> Query0/Query0/Query0.Serialize/ParMultiDo(Anonymous) -> Query0/Query0.Debug/ParMultiDo(Monitor) -> Query0/Query0.Stamp/ParMultiDo(Anonymous) -> Query0.Format/ParMultiDo(Anonymous) -> Query0.DevNull/ParMultiDo(Anonymous) (1/2) (b5360562850a3be1374b8798dfacfdce) switched from DEPLOYING to INITIALIZING. Oct 27, 2023 4:25:11 PM org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper initializeState INFO: No restore state for UnboundedSourceWrapper. Oct 27, 2023 4:25:11 PM org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper initializeState INFO: No restore state for UnboundedSourceWrapper. Oct 27, 2023 4:25:11 PM org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper open INFO: Unbounded Flink Source 1/2 is reading from sources: [UnboundedEventSource(0, 1000), UnboundedEventSource(2000, 3000), UnboundedEventSource(4000, 5000), UnboundedEventSource(6000, 7000), UnboundedEventSource(8000, 9000), UnboundedEventSource(10000, 11000), UnboundedEventSource(12000, 13000), UnboundedEventSource(14000, 15000), ...] INFO: Source: Query0.ReadUnbounded -> Flat Map -> Query0/Query0.Monitor/ParMultiDo(Monitor) -> Query0/Query0.Snoop/ParMultiDo(Anonymous) -> Query0/Query0/Query0.Serialize/ParMultiDo(Anonymous) -> Query0/Query0.Debug/ParMultiDo(Monitor) -> Query0/Query0.Stamp/ParMultiDo(Anonymous) -> Query0.Format/ParMultiDo(Anonymous) -> Query0.DevNull/ParMultiDo(Anonymous) (2/2)#0 (6cb95a324b37d5272f769a25e93e5906) switched from INITIALIZING to RUNNING. ``` after: ``` INFO: Source: Query0.ReadUnbounded -> Flat Map -> Query0/Query0.Monitor/ParMultiDo(Monitor) -> Query0/Query0.Snoop/ParMultiDo(Anonymous) -> Query0/Query0/Query0.Serialize/ParMultiDo(Anonymous) -> Query0/Query0.Debug/ParMultiDo(Monitor) -> Query0/Query0.Stamp/ParMultiDo(Anonymous) -> Query0.Format/ParMultiDo(Anonymous) -> Query0.DevNull/ParMultiDo(Anonymous) (2/2) (eee384163299676ced9b2a87bf24bbca) switched from DEPLOYING to INITIALIZING. Oct 27, 2023 10:22:48 PM org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSourceReader start WARNING: AutoWatermarkInterval is not set, watermarks won't be emitted. ... INFO: Source: Query0.ReadUnbounded -> Flat Map -> Query0/Query0.Monitor/ParMultiDo(Monitor) -> Query0/Query0.Snoop/ParMultiDo(Anonymous) -> Query0/Query0/Query0.Serialize/ParMultiDo(Anonymous) -> Query0/Query0.Debug/ParMultiDo(Monitor) -> Query0/Query0.Stamp/ParMultiDo(Anonymous) -> Query0.Format/ParMultiDo(Anonymous) -> Query0.DevNull/ParMultiDo(Anonymous) (2/2) (eee384163299676ced9b2a87bf24bbca) switched from INITIALIZING to RUNNING. Oct 27, 2023 10:22:48 PM org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplitEnumerator assignSplitsAndLog INFO: Assigned splits [[SplitIndex: 0, BeamSource: UnboundedEventSource(0, 1000)], [SplitIndex: 2, BeamSource: UnboundedEventSource(2000, 3000)], [SplitIndex: 4, BeamSource: UnboundedEventSource(4000, 5000)], [SplitIndex: 6, BeamSource: UnboundedEventSource(6000, 7000)], [SplitIndex: 8, BeamSource: UnboundedEventSource(8000, 9000)], [SplitIndex: 10, BeamSource: UnboundedEventSource(10000, 11000)], [SplitIndex: 12, BeamSource: UnboundedEventSource(12000, 13000)], [SplitIndex: 14, BeamSource: ``` In particular, before, UnboundedEventSource splits at INITIALIZING, log from `org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper open` After, UnboundedEventSource split at RUNNING, log from `org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplitEnumerator assignSplitsAndLog` From #28614, the PR should only change batch, but it's breaking streaming. @jto could you please investigate? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
