Hi Lukasz, With the current API we provided, messages cannot be acked from a different client.
The server will re-send messages to the reconnected client if those messages were not acked. So there'll be duplicate messages, but with a "redeliver times" property in the header. Thanks for your helpful information, I'll check the UnboundedSources, thanks! On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik <[email protected]> wrote: > Are duplicate messages ok? > > Can you ack messages from a different client or are messages sticky to a > single client (e.g. if one client loses connection, when it reconnects can > it ack messages it received or are those messages automatically replayed)? > > UnboundedSources are the only current "source" type that supports > finalization callbacks[1] that you will need to ack messages and > deduplication[2]. SplittableDoFn will support both of these features but > are not there yet. > > 1: > https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129 > 2: > https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93 > > > On Wed, Sep 19, 2018 at 8:31 PM flyisland <[email protected]> wrote: > >> Hi Lukasz, >> >> This socket server is like an MQTT server, it has queues inside it and >> the client should ack each message. >> >> > Is receiving and processing each message reliably important or is it ok >> to drop messages when things fail? >> A: Reliable is important, no messages should be lost. >> >> > Is there a message acknowledgement system or can you request a position >> within the message stream (e.g. send all messages from position X when >> connecting and if for whatever reason you need to reconnect you can say >> send messages from position X to replay past messages)? >> A: Client should ack each message it received, and the server will delete >> the acked message. If the client broked and the server do not receive an >> ack, the server will re-send the message to the client while it online >> again. And there is no "position" concept like kafka. >> >> >> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik <[email protected]> wrote: >> >>> Before getting into what you could use and the current state of >>> SplittableDoFn and its supported features, I was wondering what reliability >>> guarantees does the socket server have around messages? >>> >>> Is receiving and processing each message reliably important or is it ok >>> to drop messages when things fail? >>> Is there a message acknowledgement system or can you request a position >>> within the message stream (e.g. send all messages from position X when >>> connecting and if for whatever reason you need to reconnect you can say >>> send messages from position X to replay past messages)? >>> >>> >>> >>> >>> On Tue, Sep 18, 2018 at 5:00 PM flyisland <[email protected]> wrote: >>> >>>> >>>> Hi Gurus, >>>> >>>> I'm trying to create an IO connector to fetch data from a socket server >>>> from Beam, I'm new to Beam, but according to this blog < >>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it >>>> seems that SDF is the recommended way to implement an IO connector now. >>>> >>>> This in-house built socket server could accept multiple clients, but >>>> only send messages to the first-connected client, and will send messages to >>>> the second client if the first one disconnected. >>>> >>>> To understand the lifecycle of a DoFn, I've just created a very simple >>>> DoFn subclass, call log.debug() in every method, and according to the >>>> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient >>>> in-memory resources, such as network connections. The resources can then be >>>> disposed in DoFn.Teardown." I guess I should create the connection to the >>>> socket server in the setup() method. >>>> >>>> But based on the log messages below, even the input PCollection has >>>> only one element, Beam will still create more multiple DemoIO instances and >>>> invoked a different DemoIO instance after every checkpoint. >>>> >>>> I'm wondering: >>>> 1. How could I let Beam create only one DemoIO instance, or at least >>>> use the same instance constantly? >>>> 2. Or should I use the Source API for such purpose? >>>> >>>> Thanks in advance. >>>> >>>> Logs: >>>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@60a58077->setup() is called! >>>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO - >>>> First->getInitialRestriction() is called! >>>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@417eede1->setup() is called! >>>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called! >>>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called! >>>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0, >>>> 9223372036854775807)->newTracker() is called! >>>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0, >>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is >>>> called! >>>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0, >>>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end! >>>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called! >>>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called! >>>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>> 2018-09-18T23:15:56.285Z -> 0 -> First >>>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called! >>>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>> 2018-09-18T23:15:56.786Z -> 1 -> First >>>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2, >>>> 9223372036854775807)->newTracker() is called! >>>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2, >>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is >>>> called! >>>> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2, >>>> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end! >>>> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called! >>>> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>> 2018-09-18T23:15:57.354Z -> 2 -> First >>>> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@142109e->setup() is called! >>>> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>> 2018-09-18T23:15:57.856Z -> 3 -> First >>>> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@142109e->startBundle() is called! >>>> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>> 2018-09-18T23:15:58.358Z -> 4 -> First >>>> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5, >>>> 9223372036854775807)->newTracker() is called! >>>> 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5, >>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is >>>> called! >>>> 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5, >>>> 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end! >>>> 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO - >>>> org.apache.beam.examples.DemoIO@142109e->finishBundle() is called! >>>> >>>> WindowedWordCountSDF.java >>>> >>>> Pipeline pipeline = Pipeline.create(options); >>>> List<String> LINES = Arrays.asList("First"); >>>> PCollection<String> input = >>>> pipeline >>>> .apply(Create.of(LINES)) >>>> .apply(ParDo.of(new DemoIO())); >>>> ... >>>> >>>> >>>> DemoIO.java >>>> >>>> public class DemoIO extends DoFn<String, String> { >>>> private static final Logger LOG = >>>> LoggerFactory.getLogger(DemoIO.class); >>>> >>>> public DemoIO(){ >>>> super(); >>>> LOG.debug("{}->new DemoIO() is called!", this); >>>> } >>>> >>>> @ProcessElement >>>> public void process(ProcessContext c, OffsetRangeTracker tracker) { >>>> LOG.debug("{}->process({}) is called!", this, tracker); >>>> >>>> for (long i = tracker.currentRestriction().getFrom(); >>>> tracker.tryClaim(i); ++i) { >>>> sleep(500); >>>> c.outputWithTimestamp(i + " -> " + c.element(), Instant.now()); >>>> } >>>> LOG.debug("{}->process({}) end!", this, tracker); >>>> } >>>> >>>> @GetInitialRestriction >>>> public OffsetRange getInitialRestriction(String input) { >>>> LOG.debug("{}->getInitialRestriction() is called!", input); >>>> return new OffsetRange(0, Long.MAX_VALUE); >>>> // return new OffsetRange(0, 100); >>>> } >>>> >>>> @NewTracker >>>> public OffsetRangeTracker newTracker(OffsetRange range) { >>>> LOG.debug("{}->newTracker() is called!", range); >>>> return new OffsetRangeTracker(range); >>>> } >>>> >>>> @Setup >>>> public void setup(){ >>>> LOG.debug("{}->setup() is called!", this); >>>> } >>>> >>>> @StartBundle >>>> public void startBundle(){ >>>> LOG.debug("{}->startBundle() is called!", this); >>>> } >>>> >>>> >>>> >>>>
