Hi Kenneth

Thanks for looking into it. I am currently trying to implement Sinks for
writing data into Cassandra/Titan DB.  My immediate goal is to run it on
Flink Runner.



Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles <[email protected]>
wrote:

> Hi Sumit,
>
> I see what has happened here, from that snippet you pasted from the Flink
> runner's code [1]. Thanks for looking into it!
>
> The Flink runner today appears to reject Write.Bounded transforms in
> streaming mode if the sink is not an instance of UnboundedFlinkSink. The
> intent of that code, I believe, was to special case UnboundedFlinkSink to
> make it easy to use an existing Flink sink, not to disable all other Write
> transforms. What do you think, Max?
>
> Until we fix this issue, you should use ParDo transforms to do the writing.
> If you can share a little about your sink, we may be able to suggest
> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> transform is just a specialized pattern of ParDo's, not a Beam primitive.
>
> Kenn
>
> [1]
>
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>
>
> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> [email protected]> wrote:
>
> > Thanks Sumit. Looks like your question is, indeed, specific to the Flink
> > runner, and I'll then defer to somebody familiar with it.
> >
> > On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <[email protected]>
> > wrote:
> >
> > > Thanks a lot Eugene.
> > >
> > > >>>My immediate requirement is to run this Sink on FlinkRunner. Which
> > > mandates that my implementation must also implement SinkFunction<>.  In
> > > that >>>case, none of the Sink<> methods get called anyway.
> > >
> > > I am using FlinkRunner. The Sink implementation that i was writing by
> > > extending Sink<> class had to implement Flink Specific SinkFunction for
> > the
> > > correct translation.
> > >
> > > private static class WriteSinkStreamingTranslator<T> implements
> > >
> >
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> > > {
> > >
> > >   @Override
> > >   public void translateNode(Write.Bound<T> transform,
> > > FlinkStreamingTranslationContext context) {
> > >     String name = transform.getName();
> > >     PValue input = context.getInput(transform);
> > >
> > >     Sink<T> sink = transform.getSink();
> > >     if (!(sink instanceof UnboundedFlinkSink)) {
> > >       throw new UnsupportedOperationException("At the time, only
> > > unbounded Flink sinks are supported.");
> > >     }
> > >
> > >     DataStream<WindowedValue<T>> inputDataSet =
> > > context.getInputDataStream(input);
> > >
> > >     inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
> Object>()
> > {
> > >       @Override
> > >       public void flatMap(WindowedValue<T> value, Collector<Object>
> > > out) throws Exception {
> > >         out.collect(value.getValue());
> > >       }
> > >     }).addSink(((UnboundedFlinkSink<Object>)
> > > sink).getFlinkSource()).name(name);
> > >   }
> > > }
> > >
> > >
> > >
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> > > [email protected]> wrote:
> > >
> > > > Hi Sumit,
> > > >
> > > > All reusable parts of a pipeline, including connectors to storage
> > > systems,
> > > > should be packaged as PTransform's.
> > > >
> > > > Sink is an advanced API that you can use under the hood to implement
> > the
> > > > transform, if this particular connector benefits from this API - but
> > you
> > > > don't have to, and many connectors indeed don't need it, and are
> > simpler
> > > to
> > > > implement just as wrappers around a couple of ParDo's writing the
> data.
> > > >
> > > > Even if the connector is implemented using a Sink, packaging the
> > > connector
> > > > as a PTransform is important because it's easier to apply in a
> pipeline
> > > and
> > > > because it's more future-proof (the author of the connector may later
> > > > change it to use something else rather than Sink under the hood
> without
> > > > breaking existing users).
> > > >
> > > > Sink is, currently, useful in the following case:
> > > > - You're writing a bounded amount of data (we do not yet have an
> > > unbounded
> > > > Sink analogue)
> > > > - The location you're writing to is known at pipeline construction
> > time,
> > > > and does not depend on the data itself (support for "data-dependent"
> > > sinks
> > > > is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> > > > - The storage system you're writing to has a distinct
> "initialization"
> > > and
> > > > "finalization" step, allowing the write operation to appear atomic
> > > (either
> > > > all data is written or none). This mostly applies to files (where
> > writing
> > > > is done by first writing to a temporary directory, and then renaming
> > all
> > > > files to their final location), but there can be other cases too.
> > > >
> > > > Here's an example GCP connector using the Sink API under the hood:
> > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > > > Most other non-file-based connectors, indeed, don't (KafkaIO,
> > > DatastoreIO,
> > > > BigtableIO etc.)
> > > >
> > > > I'm not familiar with the Flink API, however I'm a bit confused by
> your
> > > > last paragraph: the Beam programming model is intentionally
> > > > runner-agnostic, so that you can run exactly the same code on
> different
> > > > runners.
> > > >
> > > > On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <[email protected]
> >
> > > > wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > Please suggest me on what is the best way to write a Sink in
> Beam.  I
> > > see
> > > > > that there is a Sink<T> abstract class which is in experimental
> > state.
> > > > > What is the expected outcome of this one? Do we have the api
> frozen,
> > or
> > > > > this could still change?  Most of the existing Sink implementations
> > > like
> > > > > KafkaIO.Write are not using this interface, and instead extends
> > > > > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
> > > > extend
> > > > > Sink<>.
> > > > >
> > > > >
> > > > > My immediate requirement is to run this Sink on FlinkRunner. Which
> > > > mandates
> > > > > that my implementation must also implement SinkFunction<>.  In that
> > > case,
> > > > > none of the Sink<> methods get called anyway.
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > >
> > >
> >
>

Reply via email to