Hi Raghu,

Thanks very much! Yes, I think I should focus on the UnboundeSource API for
now.

On Sat, Sep 22, 2018 at 7:01 AM Raghu Angadi <[email protected]> wrote:

> > This in-house built socket server could accept multiple clients, but
> only send messages to the first-connected client, and will send messages to
> the second client if the first one disconnected.
>
> Server sending messages to first client connection only is quite critical.
> Even if you use Source API which honors 'Setup()' JavaDoc, it is not enough
> in your case. Note that is says it reuses, but that does not guarantee
> single DoFn instance or when it actually calls TearDown(). It is on
> best-effort basis. The work could move to a different worker and the DoFn
> instance on earlier worker can live for a long time. So if you held the
> connection to server until TearDown() is called, you could be inadvertently
> blocking reads from DoFn on the new worker. If you want to keep the
> connection open across bundles, you need some way to close an idle
> connection asynchronously (alternately your service might have timeout to
> close an idle client connection, which is much better). Since you can't
> afford to wait till TearDown(), you might as well have a singleton
> connection that gets closed after some idle time.
>
> Assuming you need to ack on the same connection that served the records,
> finalize() functionality in UnboundedSource API is important case. You can
> use UnboundeSource API for now.
>
> On Thu, Sep 20, 2018 at 8:25 PM flyisland <[email protected]> wrote:
>
>> Hi Reuven,
>>
>> There is no explicit ID in the message itself, and if there is
>> information can be used as an ID is depend on use cases.
>>
>> On Fri, Sep 21, 2018 at 11:05 AM Reuven Lax <[email protected]> wrote:
>>
>>> Is there information in the message that can be used as an id, that can
>>> be used for deduplication?
>>>
>>> On Thu, Sep 20, 2018 at 6:36 PM flyisland <[email protected]> wrote:
>>>
>>>> Hi Lukasz,
>>>>
>>>> With the current API we provided, messages cannot be acked from a
>>>> different client.
>>>>
>>>> The server will re-send messages to the reconnected client if those
>>>> messages were not acked. So there'll be duplicate messages, but with a
>>>> "redeliver times" property in the header.
>>>>
>>>> Thanks for your helpful information, I'll check the UnboundedSources,
>>>> thanks!
>>>>
>>>>
>>>>
>>>> On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> Are duplicate messages ok?
>>>>>
>>>>> Can you ack messages from a different client or are messages sticky to
>>>>> a single client (e.g. if one client loses connection, when it reconnects
>>>>> can it ack messages it received or are those messages automatically
>>>>> replayed)?
>>>>>
>>>>> UnboundedSources are the only current "source" type that supports
>>>>> finalization callbacks[1] that you will need to ack messages and
>>>>> deduplication[2]. SplittableDoFn will support both of these features but
>>>>> are not there yet.
>>>>>
>>>>> 1:
>>>>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
>>>>> 2:
>>>>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93
>>>>>
>>>>>
>>>>> On Wed, Sep 19, 2018 at 8:31 PM flyisland <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Lukasz,
>>>>>>
>>>>>> This socket server is like an MQTT server, it has queues inside it
>>>>>> and the client should ack each message.
>>>>>>
>>>>>> > Is receiving and processing each message reliably important or is
>>>>>> it ok to drop messages when things fail?
>>>>>> A: Reliable is important, no messages should be lost.
>>>>>>
>>>>>> > Is there a message acknowledgement system or can you request a
>>>>>> position within the message stream (e.g. send all messages from position 
>>>>>> X
>>>>>> when connecting and if for whatever reason you need to reconnect you can
>>>>>> say send messages from position X to replay past messages)?
>>>>>> A: Client should ack each message it received, and the server will
>>>>>> delete the acked message. If the client broked and the server do not
>>>>>> receive an ack, the server will re-send the message to the client while 
>>>>>> it
>>>>>> online again. And there is no "position" concept like kafka.
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> Before getting into what you could use and the current state of
>>>>>>> SplittableDoFn and its supported features, I was wondering what 
>>>>>>> reliability
>>>>>>> guarantees does the socket server have around messages?
>>>>>>>
>>>>>>> Is receiving and processing each message reliably important or is it
>>>>>>> ok to drop messages when things fail?
>>>>>>> Is there a message acknowledgement system or can you request a
>>>>>>> position within the message stream (e.g. send all messages from 
>>>>>>> position X
>>>>>>> when connecting and if for whatever reason you need to reconnect you can
>>>>>>> say send messages from position X to replay past messages)?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 18, 2018 at 5:00 PM flyisland <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hi Gurus,
>>>>>>>>
>>>>>>>> I'm trying to create an IO connector to fetch data from a socket
>>>>>>>> server from Beam, I'm new to Beam, but according to this blog <
>>>>>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it
>>>>>>>> seems that SDF is the recommended way to implement an IO connector now.
>>>>>>>>
>>>>>>>> This in-house built socket server could accept multiple clients,
>>>>>>>> but only send messages to the first-connected client, and will send
>>>>>>>> messages to the second client if the first one disconnected.
>>>>>>>>
>>>>>>>> To understand the lifecycle of a DoFn, I've just created a very
>>>>>>>> simple DoFn subclass, call log.debug() in every method, and according 
>>>>>>>> to
>>>>>>>> the JavaDoc of DoFn.Setup(), "This is a good place to initialize 
>>>>>>>> transient
>>>>>>>> in-memory resources, such as network connections. The resources can 
>>>>>>>> then be
>>>>>>>> disposed in DoFn.Teardown." I guess I should create the connection to 
>>>>>>>> the
>>>>>>>> socket server in the setup() method.
>>>>>>>>
>>>>>>>> But based on the log messages below, even the input PCollection has
>>>>>>>> only one element, Beam will still create more multiple DemoIO 
>>>>>>>> instances and
>>>>>>>> invoked a different DemoIO instance after every checkpoint.
>>>>>>>>
>>>>>>>> I'm wondering:
>>>>>>>> 1. How could I let Beam create only one DemoIO instance, or at
>>>>>>>> least use the same instance constantly?
>>>>>>>> 2. Or should I use the Source API for such purpose?
>>>>>>>>
>>>>>>>> Thanks in advance.
>>>>>>>>
>>>>>>>> Logs:
>>>>>>>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
>>>>>>>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> First->getInitialRestriction() is called!
>>>>>>>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
>>>>>>>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
>>>>>>>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
>>>>>>>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
>>>>>>>> 9223372036854775807)->newTracker() is called!
>>>>>>>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>>>>>>>> 9223372036854775807), lastClaimedOffset=null, 
>>>>>>>> lastAttemptedOffset=null}) is
>>>>>>>> called!
>>>>>>>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>>>>>>>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
>>>>>>>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
>>>>>>>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
>>>>>>>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>>>> 2018-09-18T23:15:56.285Z -> 0 -> First
>>>>>>>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
>>>>>>>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>>>> 2018-09-18T23:15:56.786Z -> 1 -> First
>>>>>>>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
>>>>>>>> 9223372036854775807)->newTracker() is called!
>>>>>>>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>>>>>>>> 9223372036854775807), lastClaimedOffset=null, 
>>>>>>>> lastAttemptedOffset=null}) is
>>>>>>>> called!
>>>>>>>> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>>>>>>>> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
>>>>>>>> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
>>>>>>>> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>>>> 2018-09-18T23:15:57.354Z -> 2 -> First
>>>>>>>> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@142109e->setup() is called!
>>>>>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>>>> 2018-09-18T23:15:57.856Z -> 3 -> First
>>>>>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
>>>>>>>> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>>>> 2018-09-18T23:15:58.358Z -> 4 -> First
>>>>>>>> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
>>>>>>>> 9223372036854775807)->newTracker() is called!
>>>>>>>> 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>>>>>>>> 9223372036854775807), lastClaimedOffset=null, 
>>>>>>>> lastAttemptedOffset=null}) is
>>>>>>>> called!
>>>>>>>> 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>>>>>>>> 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end!
>>>>>>>> 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@142109e->finishBundle() is called!
>>>>>>>>
>>>>>>>> WindowedWordCountSDF.java
>>>>>>>>
>>>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>>> List<String> LINES = Arrays.asList("First");
>>>>>>>> PCollection<String> input =
>>>>>>>>     pipeline
>>>>>>>>             .apply(Create.of(LINES))
>>>>>>>>             .apply(ParDo.of(new DemoIO()));
>>>>>>>> ...
>>>>>>>>
>>>>>>>>
>>>>>>>> DemoIO.java
>>>>>>>>
>>>>>>>> public class DemoIO extends DoFn<String, String> {
>>>>>>>>     private static final Logger LOG = 
>>>>>>>> LoggerFactory.getLogger(DemoIO.class);
>>>>>>>>
>>>>>>>>     public DemoIO(){
>>>>>>>>         super();
>>>>>>>>         LOG.debug("{}->new DemoIO() is called!", this);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @ProcessElement
>>>>>>>>     public void process(ProcessContext c, OffsetRangeTracker tracker) {
>>>>>>>>         LOG.debug("{}->process({}) is called!", this, tracker);
>>>>>>>>
>>>>>>>>         for (long i = tracker.currentRestriction().getFrom(); 
>>>>>>>> tracker.tryClaim(i); ++i) {
>>>>>>>>             sleep(500);
>>>>>>>>             c.outputWithTimestamp(i + " -> " + c.element(), 
>>>>>>>> Instant.now());
>>>>>>>>         }
>>>>>>>>         LOG.debug("{}->process({}) end!", this, tracker);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @GetInitialRestriction
>>>>>>>>     public OffsetRange getInitialRestriction(String input) {
>>>>>>>>         LOG.debug("{}->getInitialRestriction() is called!", input);
>>>>>>>>         return new OffsetRange(0, Long.MAX_VALUE);
>>>>>>>> //        return new OffsetRange(0, 100);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @NewTracker
>>>>>>>>     public OffsetRangeTracker newTracker(OffsetRange range) {
>>>>>>>>         LOG.debug("{}->newTracker() is called!", range);
>>>>>>>>         return new OffsetRangeTracker(range);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Setup
>>>>>>>>     public void setup(){
>>>>>>>>         LOG.debug("{}->setup() is called!", this);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @StartBundle
>>>>>>>>     public void startBundle(){
>>>>>>>>         LOG.debug("{}->startBundle() is called!", this);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to