Thanks Sumit. Looks like your question is, indeed, specific to the Flink
runner, and I'll then defer to somebody familiar with it.

On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <[email protected]> wrote:

> Thanks a lot Eugene.
>
> >>>My immediate requirement is to run this Sink on FlinkRunner. Which
> mandates that my implementation must also implement SinkFunction<>.  In
> that >>>case, none of the Sink<> methods get called anyway.
>
> I am using FlinkRunner. The Sink implementation that i was writing by
> extending Sink<> class had to implement Flink Specific SinkFunction for the
> correct translation.
>
> private static class WriteSinkStreamingTranslator<T> implements
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> {
>
>   @Override
>   public void translateNode(Write.Bound<T> transform,
> FlinkStreamingTranslationContext context) {
>     String name = transform.getName();
>     PValue input = context.getInput(transform);
>
>     Sink<T> sink = transform.getSink();
>     if (!(sink instanceof UnboundedFlinkSink)) {
>       throw new UnsupportedOperationException("At the time, only
> unbounded Flink sinks are supported.");
>     }
>
>     DataStream<WindowedValue<T>> inputDataSet =
> context.getInputDataStream(input);
>
>     inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() {
>       @Override
>       public void flatMap(WindowedValue<T> value, Collector<Object>
> out) throws Exception {
>         out.collect(value.getValue());
>       }
>     }).addSink(((UnboundedFlinkSink<Object>)
> sink).getFlinkSource()).name(name);
>   }
> }
>
>
>
>
> Regards
> Sumit Chawla
>
>
> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> [email protected]> wrote:
>
> > Hi Sumit,
> >
> > All reusable parts of a pipeline, including connectors to storage
> systems,
> > should be packaged as PTransform's.
> >
> > Sink is an advanced API that you can use under the hood to implement the
> > transform, if this particular connector benefits from this API - but you
> > don't have to, and many connectors indeed don't need it, and are simpler
> to
> > implement just as wrappers around a couple of ParDo's writing the data.
> >
> > Even if the connector is implemented using a Sink, packaging the
> connector
> > as a PTransform is important because it's easier to apply in a pipeline
> and
> > because it's more future-proof (the author of the connector may later
> > change it to use something else rather than Sink under the hood without
> > breaking existing users).
> >
> > Sink is, currently, useful in the following case:
> > - You're writing a bounded amount of data (we do not yet have an
> unbounded
> > Sink analogue)
> > - The location you're writing to is known at pipeline construction time,
> > and does not depend on the data itself (support for "data-dependent"
> sinks
> > is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> > - The storage system you're writing to has a distinct "initialization"
> and
> > "finalization" step, allowing the write operation to appear atomic
> (either
> > all data is written or none). This mostly applies to files (where writing
> > is done by first writing to a temporary directory, and then renaming all
> > files to their final location), but there can be other cases too.
> >
> > Here's an example GCP connector using the Sink API under the hood:
> >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > Most other non-file-based connectors, indeed, don't (KafkaIO,
> DatastoreIO,
> > BigtableIO etc.)
> >
> > I'm not familiar with the Flink API, however I'm a bit confused by your
> > last paragraph: the Beam programming model is intentionally
> > runner-agnostic, so that you can run exactly the same code on different
> > runners.
> >
> > On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <[email protected]>
> > wrote:
> >
> > > Hi
> > >
> > > Please suggest me on what is the best way to write a Sink in Beam.  I
> see
> > > that there is a Sink<T> abstract class which is in experimental state.
> > > What is the expected outcome of this one? Do we have the api frozen, or
> > > this could still change?  Most of the existing Sink implementations
> like
> > > KafkaIO.Write are not using this interface, and instead extends
> > > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
> > extend
> > > Sink<>.
> > >
> > >
> > > My immediate requirement is to run this Sink on FlinkRunner. Which
> > mandates
> > > that my implementation must also implement SinkFunction<>.  In that
> case,
> > > none of the Sink<> methods get called anyway.
> > >
> > > Regards
> > > Sumit Chawla
> > >
> >
>

Reply via email to