nbali edited a comment on pull request #16773:
URL: https://github.com/apache/beam/pull/16773#issuecomment-1043786072


   @kennknowles 
   > One problem here is that there is no batch implementation for 
`UnboundedSource` and also batch execution of an unbounded splittable `DoFn` 
(where each element may produce unbounded output.
   > 
   > This is an example where finite != bounded. It is up to the IO to produce 
a bounded PCollection, and the runner to figure out how to execute it. But 
batch Dataflow will never be able to evaluate an unbounded PCollection.
   
   Well this might be a hopefully pleasant surprise for you then, but with 
`DataflowRunner` a pipeline having an unbounded collection - I'm guessing due 
to `ReadFromKafkaDoFn` having `@UnboundedPerElement` - with actually a finite 
amount of data - due to `KafkaIO.Read.withStopReadTime` - stops perfectly both 
in batch and streaming mode. Apart from the obvious differences predestined by 
the batch/streaming mode already the biggest differences are the execution time 
and the cost. It takes longer and costs more to process the same data with 
streaming. This intends to fix that.
   
   Actual data from the GCP console from two jobs launched with 
`KafkaIO.Read.withStartTime` and `KafkaIO.Read.withStopTime` at the same time 
at the same topic/data with the same configurations/workers/etc with 
batch/streaming being the only difference:
   | Name | Value |
   | - | - |
   | Job type | Batch |
   | Job status | Succeeded |
   | Elapsed time | 5 min 46 sec |
   | Total memory time | 0,15 GB hr |
   | Total Shuffle data processed | 81,43 MB |
   | Billable Shuffle data processed | 20,36 MB |
   
   | Name | Value |
   | - | - |
   | Job type | Streaming |
   | Job status | Succeeded |
   | Elapsed time | 6 min 42 sec |
   | Total memory time | 0,226 GB hr |
   | Total streaming data processed | 121,47 MB |
   
   The price difference seems significant, but I haven't tested with **big** 
amount of data so it might be more linear later... but what's the biggest PitA 
for us is the lack of batch processing with the simple grouping, etc, and the 
need to handle the data as streaming with windows, triggers, etc, when 
business-wise it would be absolutely NOT necessarily.
   
   > Disabling the autodetection would have to have a scan for unbounded 
PCollections and then fail the job before submitting it.
   
   So based on the actual behaviour I respectfully disagree with your 
statement, because it actually works. I introduced it as an experiment, so if 
someone knows what he is doing, he can turn it off, but it's still there for 
everybody else "just to be sure".
   
   > Specifically: [#15951 
(comment)](https://github.com/apache/beam/pull/15951#issuecomment-990238052)
   
   I agree that this should be the "proper" solution for this "specific" issue, 
but this workaround doesn't only handle that, but every similar issue as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to