Hi Raghu, Thanks very much! Yes, I think I should focus on the UnboundeSource API for now.
On Sat, Sep 22, 2018 at 7:01 AM Raghu Angadi <[email protected]> wrote: > > This in-house built socket server could accept multiple clients, but > only send messages to the first-connected client, and will send messages to > the second client if the first one disconnected. > > Server sending messages to first client connection only is quite critical. > Even if you use Source API which honors 'Setup()' JavaDoc, it is not enough > in your case. Note that is says it reuses, but that does not guarantee > single DoFn instance or when it actually calls TearDown(). It is on > best-effort basis. The work could move to a different worker and the DoFn > instance on earlier worker can live for a long time. So if you held the > connection to server until TearDown() is called, you could be inadvertently > blocking reads from DoFn on the new worker. If you want to keep the > connection open across bundles, you need some way to close an idle > connection asynchronously (alternately your service might have timeout to > close an idle client connection, which is much better). Since you can't > afford to wait till TearDown(), you might as well have a singleton > connection that gets closed after some idle time. > > Assuming you need to ack on the same connection that served the records, > finalize() functionality in UnboundedSource API is important case. You can > use UnboundeSource API for now. > > On Thu, Sep 20, 2018 at 8:25 PM flyisland <[email protected]> wrote: > >> Hi Reuven, >> >> There is no explicit ID in the message itself, and if there is >> information can be used as an ID is depend on use cases. >> >> On Fri, Sep 21, 2018 at 11:05 AM Reuven Lax <[email protected]> wrote: >> >>> Is there information in the message that can be used as an id, that can >>> be used for deduplication? >>> >>> On Thu, Sep 20, 2018 at 6:36 PM flyisland <[email protected]> wrote: >>> >>>> Hi Lukasz, >>>> >>>> With the current API we provided, messages cannot be acked from a >>>> different client. >>>> >>>> The server will re-send messages to the reconnected client if those >>>> messages were not acked. So there'll be duplicate messages, but with a >>>> "redeliver times" property in the header. >>>> >>>> Thanks for your helpful information, I'll check the UnboundedSources, >>>> thanks! >>>> >>>> >>>> >>>> On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> Are duplicate messages ok? >>>>> >>>>> Can you ack messages from a different client or are messages sticky to >>>>> a single client (e.g. if one client loses connection, when it reconnects >>>>> can it ack messages it received or are those messages automatically >>>>> replayed)? >>>>> >>>>> UnboundedSources are the only current "source" type that supports >>>>> finalization callbacks[1] that you will need to ack messages and >>>>> deduplication[2]. SplittableDoFn will support both of these features but >>>>> are not there yet. >>>>> >>>>> 1: >>>>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129 >>>>> 2: >>>>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93 >>>>> >>>>> >>>>> On Wed, Sep 19, 2018 at 8:31 PM flyisland <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Lukasz, >>>>>> >>>>>> This socket server is like an MQTT server, it has queues inside it >>>>>> and the client should ack each message. >>>>>> >>>>>> > Is receiving and processing each message reliably important or is >>>>>> it ok to drop messages when things fail? >>>>>> A: Reliable is important, no messages should be lost. >>>>>> >>>>>> > Is there a message acknowledgement system or can you request a >>>>>> position within the message stream (e.g. send all messages from position >>>>>> X >>>>>> when connecting and if for whatever reason you need to reconnect you can >>>>>> say send messages from position X to replay past messages)? >>>>>> A: Client should ack each message it received, and the server will >>>>>> delete the acked message. If the client broked and the server do not >>>>>> receive an ack, the server will re-send the message to the client while >>>>>> it >>>>>> online again. And there is no "position" concept like kafka. >>>>>> >>>>>> >>>>>> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> Before getting into what you could use and the current state of >>>>>>> SplittableDoFn and its supported features, I was wondering what >>>>>>> reliability >>>>>>> guarantees does the socket server have around messages? >>>>>>> >>>>>>> Is receiving and processing each message reliably important or is it >>>>>>> ok to drop messages when things fail? >>>>>>> Is there a message acknowledgement system or can you request a >>>>>>> position within the message stream (e.g. send all messages from >>>>>>> position X >>>>>>> when connecting and if for whatever reason you need to reconnect you can >>>>>>> say send messages from position X to replay past messages)? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 18, 2018 at 5:00 PM flyisland <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> Hi Gurus, >>>>>>>> >>>>>>>> I'm trying to create an IO connector to fetch data from a socket >>>>>>>> server from Beam, I'm new to Beam, but according to this blog < >>>>>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it >>>>>>>> seems that SDF is the recommended way to implement an IO connector now. >>>>>>>> >>>>>>>> This in-house built socket server could accept multiple clients, >>>>>>>> but only send messages to the first-connected client, and will send >>>>>>>> messages to the second client if the first one disconnected. >>>>>>>> >>>>>>>> To understand the lifecycle of a DoFn, I've just created a very >>>>>>>> simple DoFn subclass, call log.debug() in every method, and according >>>>>>>> to >>>>>>>> the JavaDoc of DoFn.Setup(), "This is a good place to initialize >>>>>>>> transient >>>>>>>> in-memory resources, such as network connections. The resources can >>>>>>>> then be >>>>>>>> disposed in DoFn.Teardown." I guess I should create the connection to >>>>>>>> the >>>>>>>> socket server in the setup() method. >>>>>>>> >>>>>>>> But based on the log messages below, even the input PCollection has >>>>>>>> only one element, Beam will still create more multiple DemoIO >>>>>>>> instances and >>>>>>>> invoked a different DemoIO instance after every checkpoint. >>>>>>>> >>>>>>>> I'm wondering: >>>>>>>> 1. How could I let Beam create only one DemoIO instance, or at >>>>>>>> least use the same instance constantly? >>>>>>>> 2. Or should I use the Source API for such purpose? >>>>>>>> >>>>>>>> Thanks in advance. >>>>>>>> >>>>>>>> Logs: >>>>>>>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@60a58077->setup() is called! >>>>>>>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> First->getInitialRestriction() is called! >>>>>>>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@417eede1->setup() is called! >>>>>>>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called! >>>>>>>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called! >>>>>>>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0, >>>>>>>> 9223372036854775807)->newTracker() is called! >>>>>>>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0, >>>>>>>> 9223372036854775807), lastClaimedOffset=null, >>>>>>>> lastAttemptedOffset=null}) is >>>>>>>> called! >>>>>>>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0, >>>>>>>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end! >>>>>>>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called! >>>>>>>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called! >>>>>>>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>>>>>> 2018-09-18T23:15:56.285Z -> 0 -> First >>>>>>>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called! >>>>>>>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>>>>>> 2018-09-18T23:15:56.786Z -> 1 -> First >>>>>>>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2, >>>>>>>> 9223372036854775807)->newTracker() is called! >>>>>>>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2, >>>>>>>> 9223372036854775807), lastClaimedOffset=null, >>>>>>>> lastAttemptedOffset=null}) is >>>>>>>> called! >>>>>>>> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2, >>>>>>>> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end! >>>>>>>> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called! >>>>>>>> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>>>>>> 2018-09-18T23:15:57.354Z -> 2 -> First >>>>>>>> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@142109e->setup() is called! >>>>>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>>>>>> 2018-09-18T23:15:57.856Z -> 3 -> First >>>>>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@142109e->startBundle() is called! >>>>>>>> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>>>>>> 2018-09-18T23:15:58.358Z -> 4 -> First >>>>>>>> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5, >>>>>>>> 9223372036854775807)->newTracker() is called! >>>>>>>> 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5, >>>>>>>> 9223372036854775807), lastClaimedOffset=null, >>>>>>>> lastAttemptedOffset=null}) is >>>>>>>> called! >>>>>>>> 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5, >>>>>>>> 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end! >>>>>>>> 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO - >>>>>>>> org.apache.beam.examples.DemoIO@142109e->finishBundle() is called! >>>>>>>> >>>>>>>> WindowedWordCountSDF.java >>>>>>>> >>>>>>>> Pipeline pipeline = Pipeline.create(options); >>>>>>>> List<String> LINES = Arrays.asList("First"); >>>>>>>> PCollection<String> input = >>>>>>>> pipeline >>>>>>>> .apply(Create.of(LINES)) >>>>>>>> .apply(ParDo.of(new DemoIO())); >>>>>>>> ... >>>>>>>> >>>>>>>> >>>>>>>> DemoIO.java >>>>>>>> >>>>>>>> public class DemoIO extends DoFn<String, String> { >>>>>>>> private static final Logger LOG = >>>>>>>> LoggerFactory.getLogger(DemoIO.class); >>>>>>>> >>>>>>>> public DemoIO(){ >>>>>>>> super(); >>>>>>>> LOG.debug("{}->new DemoIO() is called!", this); >>>>>>>> } >>>>>>>> >>>>>>>> @ProcessElement >>>>>>>> public void process(ProcessContext c, OffsetRangeTracker tracker) { >>>>>>>> LOG.debug("{}->process({}) is called!", this, tracker); >>>>>>>> >>>>>>>> for (long i = tracker.currentRestriction().getFrom(); >>>>>>>> tracker.tryClaim(i); ++i) { >>>>>>>> sleep(500); >>>>>>>> c.outputWithTimestamp(i + " -> " + c.element(), >>>>>>>> Instant.now()); >>>>>>>> } >>>>>>>> LOG.debug("{}->process({}) end!", this, tracker); >>>>>>>> } >>>>>>>> >>>>>>>> @GetInitialRestriction >>>>>>>> public OffsetRange getInitialRestriction(String input) { >>>>>>>> LOG.debug("{}->getInitialRestriction() is called!", input); >>>>>>>> return new OffsetRange(0, Long.MAX_VALUE); >>>>>>>> // return new OffsetRange(0, 100); >>>>>>>> } >>>>>>>> >>>>>>>> @NewTracker >>>>>>>> public OffsetRangeTracker newTracker(OffsetRange range) { >>>>>>>> LOG.debug("{}->newTracker() is called!", range); >>>>>>>> return new OffsetRangeTracker(range); >>>>>>>> } >>>>>>>> >>>>>>>> @Setup >>>>>>>> public void setup(){ >>>>>>>> LOG.debug("{}->setup() is called!", this); >>>>>>>> } >>>>>>>> >>>>>>>> @StartBundle >>>>>>>> public void startBundle(){ >>>>>>>> LOG.debug("{}->startBundle() is called!", this); >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>
