Thanks a lot Eugene.

>>>My immediate requirement is to run this Sink on FlinkRunner. Which
mandates that my implementation must also implement SinkFunction<>.  In
that >>>case, none of the Sink<> methods get called anyway.

I am using FlinkRunner. The Sink implementation that i was writing by
extending Sink<> class had to implement Flink Specific SinkFunction for the
correct translation.

private static class WriteSinkStreamingTranslator<T> implements
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
{

  @Override
  public void translateNode(Write.Bound<T> transform,
FlinkStreamingTranslationContext context) {
    String name = transform.getName();
    PValue input = context.getInput(transform);

    Sink<T> sink = transform.getSink();
    if (!(sink instanceof UnboundedFlinkSink)) {
      throw new UnsupportedOperationException("At the time, only
unbounded Flink sinks are supported.");
    }

    DataStream<WindowedValue<T>> inputDataSet =
context.getInputDataStream(input);

    inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() {
      @Override
      public void flatMap(WindowedValue<T> value, Collector<Object>
out) throws Exception {
        out.collect(value.getValue());
      }
    }).addSink(((UnboundedFlinkSink<Object>) sink).getFlinkSource()).name(name);
  }
}




Regards
Sumit Chawla


On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
[email protected]> wrote:

> Hi Sumit,
>
> All reusable parts of a pipeline, including connectors to storage systems,
> should be packaged as PTransform's.
>
> Sink is an advanced API that you can use under the hood to implement the
> transform, if this particular connector benefits from this API - but you
> don't have to, and many connectors indeed don't need it, and are simpler to
> implement just as wrappers around a couple of ParDo's writing the data.
>
> Even if the connector is implemented using a Sink, packaging the connector
> as a PTransform is important because it's easier to apply in a pipeline and
> because it's more future-proof (the author of the connector may later
> change it to use something else rather than Sink under the hood without
> breaking existing users).
>
> Sink is, currently, useful in the following case:
> - You're writing a bounded amount of data (we do not yet have an unbounded
> Sink analogue)
> - The location you're writing to is known at pipeline construction time,
> and does not depend on the data itself (support for "data-dependent" sinks
> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> - The storage system you're writing to has a distinct "initialization" and
> "finalization" step, allowing the write operation to appear atomic (either
> all data is written or none). This mostly applies to files (where writing
> is done by first writing to a temporary directory, and then renaming all
> files to their final location), but there can be other cases too.
>
> Here's an example GCP connector using the Sink API under the hood:
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> Most other non-file-based connectors, indeed, don't (KafkaIO, DatastoreIO,
> BigtableIO etc.)
>
> I'm not familiar with the Flink API, however I'm a bit confused by your
> last paragraph: the Beam programming model is intentionally
> runner-agnostic, so that you can run exactly the same code on different
> runners.
>
> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <[email protected]>
> wrote:
>
> > Hi
> >
> > Please suggest me on what is the best way to write a Sink in Beam.  I see
> > that there is a Sink<T> abstract class which is in experimental state.
> > What is the expected outcome of this one? Do we have the api frozen, or
> > this could still change?  Most of the existing Sink implementations like
> > KafkaIO.Write are not using this interface, and instead extends
> > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
> extend
> > Sink<>.
> >
> >
> > My immediate requirement is to run this Sink on FlinkRunner. Which
> mandates
> > that my implementation must also implement SinkFunction<>.  In that case,
> > none of the Sink<> methods get called anyway.
> >
> > Regards
> > Sumit Chawla
> >
>

Reply via email to