+dev On Mon, Sep 19, 2016 at 4:37 PM, Dan Halperin <[email protected]> wrote:
> Hey folks, > > Sorry for the confusion around sinks. Let me see if I can clear things up. > > In Beam, a Source+Reader is a very integral part of the model. A source is > the root of a pipeline and it is where runners can do lots of important > things like creating bundles, producing watermarks, and taking checkpoints. > > So we initially thought that we should have a generic Write transform that > would encapsulate common patterns when outputting data. We started with > Write+Sink. > > The Write+Sink class is simply a wrapper for the pattern (1-time global > initialization, parallel writing, 1-time global finalization), but it is > nothing more than a bunch of ParDos wired together by side inputs for > control flow. However, this pattern ONLY applies naturally to PCollections > that trigger exactly 1 time across all windows -- really, only bounded > PCollections. So it simply does not work with unbounded PCollections. > > Over time, we have learned that this was not a very general pattern. > Specifically, I believe there is exactly 1 use of the Sink today, which is > for files (FileBasedSink, and also HadoopFileSink). The global patterns for > FileBasedSink look like (1 time, create temp directory ; many times, write > a temp file ; 1 time, rename and renumber all files). > > Most other places we write data need less structure. Kafka, Google Cloud > Datastore, Google Cloud Bigtable -- most of the time you simply insert one > record or a bundle of records in a transaction, but you don't have global > multi-transaction support across many bundles. These are implemented purely > as DoFns, because we can encapsulate bundle-based batching or transactions > at that level. See BigtableIO > <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L538> > or DatastoreIO > <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L896> > for examples. > > Other times, we actually want *more* structure. For BigQueryIO in bounded > mode, we: 1) write all files in parallel ; 2) 1-time globally break temp > tables into groups that fit within 1 BigQuery import job ; 3) create > multiple temp tables in bigquery in parallel ; 4) concatenate all the temp > tables into the final table ; 5) delete all the temp files and the temp > tables. > > The fact that Write+Sink has not proved that useful has slowed down our > movement on any sort of unbounded sink or streaming sink. Instead, I think > it makes more sense to identify the important patterns and build to those. > > The Flink Rolling[File]Sink is a specific kind of sink that we should > incorporate into the SDK core alongside TextIO. But the key is that > RollingSink is not "streaming TextIO.Write" - it's actually a file sink > with specific semantics that are different than those of TextIO.Write. > > Similarly, we could extend TextIO.Write to support a file pattern like > "-WW-SS-of-NN-GG", where "WW" is based on the window, "SS of NN" is the > index of a particular shard within a write, and "GG" is the generation > number aka the number of times this Window+Shard have triggered (based on > PaneInfo). And of course to support this we'd have to force a fixed > sharding pattern all the time when writing, which hurts efficiency and > runner autotuning. In general, this is hard to do write in a way that makes > sense with late data, accumulating mode, etc. > (If you could enforce 1 triggering per window, we could drop the -GG.) > > Hope that helps explain why we don't have any awesome streaming sinks yet > in the SDK. However, we'd love for someone to start a Beam port of > RollingFileSink or something similar which has sensible semantics in the > presence of windowing, triggers, multiple firing, accumulating mode... > (Possibly just rejecting some inputs it can't handle). > > Thanks! > Dan > > > > On Mon, Sep 19, 2016 at 2:11 PM, Jean-Baptiste Onofré <[email protected]> > wrote: > >> Hi Thomas, >> >> thanks for the update. >> >> Stupid question: right now, most of the IO Sink use a simple DoFn (to >> write the PCollection elements). >> >> Does it mean that, when possible, we should use Sink abstract class, with >> a Writer that can write bundles (using open(), write(), close() methods) ? >> In that case, will the Window create the bundle ? >> >> Regards >> JB >> >> On 09/19/2016 10:46 PM, Thomas Groh wrote: >> >>> The model doesn't support dynamically creating PCollections, so the >>> proposed transform producing a Bounded PCollection from a window of an >>> Unbounded PCollection means that you end up with a Bounded Pipeline - in >>> which case it is preferable to use a Bounded Read instead. As you're >>> reading from an unbounded input, it may be necessary to write the >>> windowed values to a sink that supports continuous updates, from which >>> you can execute a Bounded Pipeline over the appropriate set of input >>> instead of on some arbitrary chunk of records (e.g. the window). >>> >>> The current Write implementation works effectively only for sinks that >>> can assume they can do something exactly once and finish. This is not an >>> assumption that can be made within arbitrary pipelines. Sinks that >>> currently only work for Bounded inputs are generally written with >>> assumptions that mean they do not work in the presence of multiple >>> trigger firings, late data, or across multiple windows, and handling >>> these facets of the model requires different behavior or configuration >>> from different sinks. >>> >>> On Mon, Sep 19, 2016 at 9:53 AM, Emanuele Cesena <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Oh yes, even better I’d say. >>> >>> Best, >>> >>> >>> > On Sep 19, 2016, at 9:48 AM, Jean-Baptiste Onofré <[email protected] >>> <mailto:[email protected]>> wrote: >>> > >>> > Hi Emanuele, >>> > >>> > +1 to support Unbounded sink, but also, a very convenient function >>> would be a Window to create a bounded collection as a subset of a >>> unbounded collection. >>> > >>> > Regards >>> > JB >>> > >>> > On 09/19/2016 05:59 PM, Emanuele Cesena wrote: >>> >> Hi, >>> >> >>> >> This is a great insight. Is there any plan to support unbounded >>> sink in Beam? >>> >> >>> >> On the temp kafka->kafka solution, this is exactly what we’re >>> doing (and I wish to change). We have production stream pipelines >>> that are kafka->kafka. Then we have 2 main use cases: kafka connect >>> to dump into hive and go batch from there, and druid for real time >>> reporting. >>> >> >>> >> However this makes prototyping really slow, and I wanted to >>> introduce Beam to short cut from kafka to anywhere. >>> >> >>> >> Best, >>> >> >>> >> >>> >>> On Sep 18, 2016, at 10:38 PM, Aljoscha Krettek >>> <[email protected] <mailto:[email protected]>> wrote: >>> >>> >>> >>> Hi, >>> >>> right now, writing to a Beam "Sink" is only supported for >>> bounded streams, as you discovered. An unbounded stream cannot be >>> transformed to a bounded stream using a window, this will just >>> "chunk" the stream differently but it will still be unbounded. >>> >>> >>> >>> The options you have right now for writing are to write to your >>> external datastore using a DoFn, using KafkaIO to write to a Kafka >>> topic or to use UnboundedFlinkSink to wrap a Flink Sink for use in a >>> Beam pipeline. The latter would allow you to use, for example, >>> BucketingSink or RollingSink from Flink. I'm only mentioning >>> UnboundedFlinkSink for completeness, I would not recommend using it >>> since your program will only work on the Flink runner. The way to >>> go, IMHO, would be to write to Kafka and then take the data from >>> there and ship it to some final location such as HDFS. >>> >>> >>> >>> Cheers, >>> >>> Aljoscha >>> >>> >>> >>> On Sun, 18 Sep 2016 at 23:17 Emanuele Cesena >>> <[email protected] <mailto:[email protected]>> wrote: >>> >>> Thanks I’ll look into it, even if it’s not really the feature I >>> need (exactly because it will stop execution). >>> >>> >>> >>> >>> >>>> On Sep 18, 2016, at 2:11 PM, Chawla,Sumit >>> <[email protected] <mailto:[email protected]>> wrote: >>> >>>> >>> >>>> Hi Emanuele >>> >>>> >>> >>>> KafkaIO supports withMaxNumRecords(X) support which will >>> create a bounded source from Kafka. However, the pipeline will >>> finish once X number of records are read. >>> >>>> >>> >>>> Regards >>> >>>> Sumit Chawla >>> >>>> >>> >>>> >>> >>>> On Sun, Sep 18, 2016 at 2:00 PM, Emanuele Cesena >>> <[email protected] <mailto:[email protected]>> wrote: >>> >>>> Hi, >>> >>>> >>> >>>> Thanks for the hint - I’ll debug better but I thought I did >>> that: >>> >>>> >>> https://github.com/ecesena/beam-starter/blob/master/src/main >>> /java/com/dataradiant/beam/examples/StreamWordCount.java#L140 >>> <https://github.com/ecesena/beam-starter/blob/master/src/mai >>> n/java/com/dataradiant/beam/examples/StreamWordCount.java#L140> >>> >>>> >>> >>>> Best, >>> >>>> >>> >>>> >>> >>>>> On Sep 18, 2016, at 1:54 PM, Jean-Baptiste Onofré >>> <[email protected] <mailto:[email protected]>> wrote: >>> >>>>> >>> >>>>> Hi Emanuele >>> >>>>> >>> >>>>> You have to use a window to create a bounded collection from >>> an unbounded source. >>> >>>>> >>> >>>>> Regards >>> >>>>> JB >>> >>>>> >>> >>>>> On Sep 18, 2016, at 21:04, Emanuele Cesena >>> <[email protected] <mailto:[email protected]>> wrote: >>> >>>>> Hi, >>> >>>>> >>> >>>>> I wrote a while ago about a simple example I was building to >>> test KafkaIO: >>> >>>>> >>> https://github.com/ecesena/beam-starter/blob/master/src/main >>> /java/com/dataradiant/beam/examples/StreamWordCount.java >>> <https://github.com/ecesena/beam-starter/blob/master/src/mai >>> n/java/com/dataradiant/beam/examples/StreamWordCount.java> >>> >>>>> >>> >>>>> Issues with Flink should be fixed now, and I’m try to run the >>> example on master and Flink 1.1.2. >>> >>>>> I’m currently getting: >>> >>>>> Caused by: java.lang.IllegalArgumentException: Write can only >>> be applied to a Bounded PCollection >>> >>>>> >>> >>>>> What is the recommended way to go here? >>> >>>>> - is there a way to create a bounded collection from an >>> unbounded one? >>> >>>>> - is there a plat to let TextIO write unbounded collections? >>> >>>>> - is there another recommended “simple sink” to use? >>> >>>>> >>> >>>>> Thank you much! >>> >>>>> >>> >>>>> Best, >>> >>>> >>> >>>> -- >>> >>>> Emanuele Cesena, Data Eng. >>> >>>> http://www.shopkick.com >>> >>>> >>> >>>> Il corpo non ha ideali >>> >>>> >>> >>>> >>> >>>> >>> >>>> >>> >>>> >>> >>> >>> >>> -- >>> >>> Emanuele Cesena, Data Eng. >>> >>> http://www.shopkick.com >>> >>> >>> >>> Il corpo non ha ideali >>> >>> >>> >>> >>> >>> >>> >>> >>> >> >>> > >>> > -- >>> > Jean-Baptiste Onofré >>> > [email protected] <mailto:[email protected]> >>> > http://blog.nanthrax.net >>> > Talend - http://www.talend.com >>> >>> -- >>> Emanuele Cesena, Data Eng. >>> http://www.shopkick.com >>> >>> Il corpo non ha ideali >>> >>> >>> >>> >>> >>> >> -- >> Jean-Baptiste Onofré >> [email protected] >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> > >
