Hi Kenneth Thanks for looking into it. I am currently trying to implement Sinks for writing data into Cassandra/Titan DB. My immediate goal is to run it on Flink Runner.
Regards Sumit Chawla On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles <[email protected]> wrote: > Hi Sumit, > > I see what has happened here, from that snippet you pasted from the Flink > runner's code [1]. Thanks for looking into it! > > The Flink runner today appears to reject Write.Bounded transforms in > streaming mode if the sink is not an instance of UnboundedFlinkSink. The > intent of that code, I believe, was to special case UnboundedFlinkSink to > make it easy to use an existing Flink sink, not to disable all other Write > transforms. What do you think, Max? > > Until we fix this issue, you should use ParDo transforms to do the writing. > If you can share a little about your sink, we may be able to suggest > patterns for implementing it. Like Eugene said, the Write.of(Sink) > transform is just a specialized pattern of ParDo's, not a Beam primitive. > > Kenn > > [1] > > https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203 > > > On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov < > [email protected]> wrote: > > > Thanks Sumit. Looks like your question is, indeed, specific to the Flink > > runner, and I'll then defer to somebody familiar with it. > > > > On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <[email protected]> > > wrote: > > > > > Thanks a lot Eugene. > > > > > > >>>My immediate requirement is to run this Sink on FlinkRunner. Which > > > mandates that my implementation must also implement SinkFunction<>. In > > > that >>>case, none of the Sink<> methods get called anyway. > > > > > > I am using FlinkRunner. The Sink implementation that i was writing by > > > extending Sink<> class had to implement Flink Specific SinkFunction for > > the > > > correct translation. > > > > > > private static class WriteSinkStreamingTranslator<T> implements > > > > > > FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> > > > { > > > > > > @Override > > > public void translateNode(Write.Bound<T> transform, > > > FlinkStreamingTranslationContext context) { > > > String name = transform.getName(); > > > PValue input = context.getInput(transform); > > > > > > Sink<T> sink = transform.getSink(); > > > if (!(sink instanceof UnboundedFlinkSink)) { > > > throw new UnsupportedOperationException("At the time, only > > > unbounded Flink sinks are supported."); > > > } > > > > > > DataStream<WindowedValue<T>> inputDataSet = > > > context.getInputDataStream(input); > > > > > > inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, > Object>() > > { > > > @Override > > > public void flatMap(WindowedValue<T> value, Collector<Object> > > > out) throws Exception { > > > out.collect(value.getValue()); > > > } > > > }).addSink(((UnboundedFlinkSink<Object>) > > > sink).getFlinkSource()).name(name); > > > } > > > } > > > > > > > > > > > > > > > Regards > > > Sumit Chawla > > > > > > > > > On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov < > > > [email protected]> wrote: > > > > > > > Hi Sumit, > > > > > > > > All reusable parts of a pipeline, including connectors to storage > > > systems, > > > > should be packaged as PTransform's. > > > > > > > > Sink is an advanced API that you can use under the hood to implement > > the > > > > transform, if this particular connector benefits from this API - but > > you > > > > don't have to, and many connectors indeed don't need it, and are > > simpler > > > to > > > > implement just as wrappers around a couple of ParDo's writing the > data. > > > > > > > > Even if the connector is implemented using a Sink, packaging the > > > connector > > > > as a PTransform is important because it's easier to apply in a > pipeline > > > and > > > > because it's more future-proof (the author of the connector may later > > > > change it to use something else rather than Sink under the hood > without > > > > breaking existing users). > > > > > > > > Sink is, currently, useful in the following case: > > > > - You're writing a bounded amount of data (we do not yet have an > > > unbounded > > > > Sink analogue) > > > > - The location you're writing to is known at pipeline construction > > time, > > > > and does not depend on the data itself (support for "data-dependent" > > > sinks > > > > is on the radar https://issues.apache.org/jira/browse/BEAM-92) > > > > - The storage system you're writing to has a distinct > "initialization" > > > and > > > > "finalization" step, allowing the write operation to appear atomic > > > (either > > > > all data is written or none). This mostly applies to files (where > > writing > > > > is done by first writing to a temporary directory, and then renaming > > all > > > > files to their final location), but there can be other cases too. > > > > > > > > Here's an example GCP connector using the Sink API under the hood: > > > > > > > > > > > > > > https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797 > > > > Most other non-file-based connectors, indeed, don't (KafkaIO, > > > DatastoreIO, > > > > BigtableIO etc.) > > > > > > > > I'm not familiar with the Flink API, however I'm a bit confused by > your > > > > last paragraph: the Beam programming model is intentionally > > > > runner-agnostic, so that you can run exactly the same code on > different > > > > runners. > > > > > > > > On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <[email protected] > > > > > > wrote: > > > > > > > > > Hi > > > > > > > > > > Please suggest me on what is the best way to write a Sink in > Beam. I > > > see > > > > > that there is a Sink<T> abstract class which is in experimental > > state. > > > > > What is the expected outcome of this one? Do we have the api > frozen, > > or > > > > > this could still change? Most of the existing Sink implementations > > > like > > > > > KafkaIO.Write are not using this interface, and instead extends > > > > > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to > > > > extend > > > > > Sink<>. > > > > > > > > > > > > > > > My immediate requirement is to run this Sink on FlinkRunner. Which > > > > mandates > > > > > that my implementation must also implement SinkFunction<>. In that > > > case, > > > > > none of the Sink<> methods get called anyway. > > > > > > > > > > Regards > > > > > Sumit Chawla > > > > > > > > > > > > > > >
