Thanks Jean

This is an interesting pattern here.  I see that its implemented as
PTransform, with constructs ( WriteOperation/Writer)  pretty similar to
Sink<T> interface.  Would love to hear more pros/cons of this pattern :) .
Definitely it gives more control over connection initialization and cleanup.

Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Sumit,
>
> I created a PR containing Cassandra IO with a sink:
>
> https://github.com/apache/incubator-beam/pull/592
>
> Maybe it can help you.
>
> Regards
> JB
>
>
> On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
>
>> Hi Kenneth
>>
>> Thanks for looking into it. I am currently trying to implement Sinks for
>> writing data into Cassandra/Titan DB.  My immediate goal is to run it on
>> Flink Runner.
>>
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles <k...@google.com.invalid
>> >
>> wrote:
>>
>> Hi Sumit,
>>>
>>> I see what has happened here, from that snippet you pasted from the Flink
>>> runner's code [1]. Thanks for looking into it!
>>>
>>> The Flink runner today appears to reject Write.Bounded transforms in
>>> streaming mode if the sink is not an instance of UnboundedFlinkSink. The
>>> intent of that code, I believe, was to special case UnboundedFlinkSink to
>>> make it easy to use an existing Flink sink, not to disable all other
>>> Write
>>> transforms. What do you think, Max?
>>>
>>> Until we fix this issue, you should use ParDo transforms to do the
>>> writing.
>>> If you can share a little about your sink, we may be able to suggest
>>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
>>> transform is just a specialized pattern of ParDo's, not a Beam primitive.
>>>
>>> Kenn
>>>
>>> [1]
>>>
>>>
>>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>>>
>>>
>>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
>>> kirpic...@google.com.invalid> wrote:
>>>
>>> Thanks Sumit. Looks like your question is, indeed, specific to the Flink
>>>> runner, and I'll then defer to somebody familiar with it.
>>>>
>>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <sumitkcha...@gmail.com>
>>>> wrote:
>>>>
>>>> Thanks a lot Eugene.
>>>>>
>>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
>>>>>>>>
>>>>>>> mandates that my implementation must also implement SinkFunction<>.
>>>>> In
>>>>> that >>>case, none of the Sink<> methods get called anyway.
>>>>>
>>>>> I am using FlinkRunner. The Sink implementation that i was writing by
>>>>> extending Sink<> class had to implement Flink Specific SinkFunction for
>>>>>
>>>> the
>>>>
>>>>> correct translation.
>>>>>
>>>>> private static class WriteSinkStreamingTranslator<T> implements
>>>>>
>>>>>
>>>>
>>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
>>>
>>>> {
>>>>>
>>>>>    @Override
>>>>>    public void translateNode(Write.Bound<T> transform,
>>>>> FlinkStreamingTranslationContext context) {
>>>>>      String name = transform.getName();
>>>>>      PValue input = context.getInput(transform);
>>>>>
>>>>>      Sink<T> sink = transform.getSink();
>>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
>>>>>        throw new UnsupportedOperationException("At the time, only
>>>>> unbounded Flink sinks are supported.");
>>>>>      }
>>>>>
>>>>>      DataStream<WindowedValue<T>> inputDataSet =
>>>>> context.getInputDataStream(input);
>>>>>
>>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
>>>>>
>>>> Object>()
>>>
>>>> {
>>>>
>>>>>        @Override
>>>>>        public void flatMap(WindowedValue<T> value, Collector<Object>
>>>>> out) throws Exception {
>>>>>          out.collect(value.getValue());
>>>>>        }
>>>>>      }).addSink(((UnboundedFlinkSink<Object>)
>>>>> sink).getFlinkSource()).name(name);
>>>>>    }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards
>>>>> Sumit Chawla
>>>>>
>>>>>
>>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
>>>>> kirpic...@google.com.invalid> wrote:
>>>>>
>>>>> Hi Sumit,
>>>>>>
>>>>>> All reusable parts of a pipeline, including connectors to storage
>>>>>>
>>>>> systems,
>>>>>
>>>>>> should be packaged as PTransform's.
>>>>>>
>>>>>> Sink is an advanced API that you can use under the hood to implement
>>>>>>
>>>>> the
>>>>
>>>>> transform, if this particular connector benefits from this API - but
>>>>>>
>>>>> you
>>>>
>>>>> don't have to, and many connectors indeed don't need it, and are
>>>>>>
>>>>> simpler
>>>>
>>>>> to
>>>>>
>>>>>> implement just as wrappers around a couple of ParDo's writing the
>>>>>>
>>>>> data.
>>>
>>>>
>>>>>> Even if the connector is implemented using a Sink, packaging the
>>>>>>
>>>>> connector
>>>>>
>>>>>> as a PTransform is important because it's easier to apply in a
>>>>>>
>>>>> pipeline
>>>
>>>> and
>>>>>
>>>>>> because it's more future-proof (the author of the connector may later
>>>>>> change it to use something else rather than Sink under the hood
>>>>>>
>>>>> without
>>>
>>>> breaking existing users).
>>>>>>
>>>>>> Sink is, currently, useful in the following case:
>>>>>> - You're writing a bounded amount of data (we do not yet have an
>>>>>>
>>>>> unbounded
>>>>>
>>>>>> Sink analogue)
>>>>>> - The location you're writing to is known at pipeline construction
>>>>>>
>>>>> time,
>>>>
>>>>> and does not depend on the data itself (support for "data-dependent"
>>>>>>
>>>>> sinks
>>>>>
>>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
>>>>>> - The storage system you're writing to has a distinct
>>>>>>
>>>>> "initialization"
>>>
>>>> and
>>>>>
>>>>>> "finalization" step, allowing the write operation to appear atomic
>>>>>>
>>>>> (either
>>>>>
>>>>>> all data is written or none). This mostly applies to files (where
>>>>>>
>>>>> writing
>>>>
>>>>> is done by first writing to a temporary directory, and then renaming
>>>>>>
>>>>> all
>>>>
>>>>> files to their final location), but there can be other cases too.
>>>>>>
>>>>>> Here's an example GCP connector using the Sink API under the hood:
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
>>>
>>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
>>>>>>
>>>>> DatastoreIO,
>>>>>
>>>>>> BigtableIO etc.)
>>>>>>
>>>>>> I'm not familiar with the Flink API, however I'm a bit confused by
>>>>>>
>>>>> your
>>>
>>>> last paragraph: the Beam programming model is intentionally
>>>>>> runner-agnostic, so that you can run exactly the same code on
>>>>>>
>>>>> different
>>>
>>>> runners.
>>>>>>
>>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <sumitkcha...@gmail.com
>>>>>>
>>>>>
>>>> wrote:
>>>>>>
>>>>>> Hi
>>>>>>>
>>>>>>> Please suggest me on what is the best way to write a Sink in
>>>>>>>
>>>>>> Beam.  I
>>>
>>>> see
>>>>>
>>>>>> that there is a Sink<T> abstract class which is in experimental
>>>>>>>
>>>>>> state.
>>>>
>>>>> What is the expected outcome of this one? Do we have the api
>>>>>>>
>>>>>> frozen,
>>>
>>>> or
>>>>
>>>>> this could still change?  Most of the existing Sink implementations
>>>>>>>
>>>>>> like
>>>>>
>>>>>> KafkaIO.Write are not using this interface, and instead extends
>>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
>>>>>>>
>>>>>> extend
>>>>>>
>>>>>>> Sink<>.
>>>>>>>
>>>>>>>
>>>>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
>>>>>>>
>>>>>> mandates
>>>>>>
>>>>>>> that my implementation must also implement SinkFunction<>.  In that
>>>>>>>
>>>>>> case,
>>>>>
>>>>>> none of the Sink<> methods get called anyway.
>>>>>>>
>>>>>>> Regards
>>>>>>> Sumit Chawla
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to