Is there information in the message that can be used as an id, that can be used for deduplication?
On Thu, Sep 20, 2018 at 6:36 PM flyisland <fly.isl...@gmail.com> wrote: > Hi Lukasz, > > With the current API we provided, messages cannot be acked from a > different client. > > The server will re-send messages to the reconnected client if those > messages were not acked. So there'll be duplicate messages, but with a > "redeliver times" property in the header. > > Thanks for your helpful information, I'll check the UnboundedSources, > thanks! > > > > On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik <lc...@google.com> wrote: > >> Are duplicate messages ok? >> >> Can you ack messages from a different client or are messages sticky to a >> single client (e.g. if one client loses connection, when it reconnects can >> it ack messages it received or are those messages automatically replayed)? >> >> UnboundedSources are the only current "source" type that supports >> finalization callbacks[1] that you will need to ack messages and >> deduplication[2]. SplittableDoFn will support both of these features but >> are not there yet. >> >> 1: >> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129 >> 2: >> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93 >> >> >> On Wed, Sep 19, 2018 at 8:31 PM flyisland <fly.isl...@gmail.com> wrote: >> >>> Hi Lukasz, >>> >>> This socket server is like an MQTT server, it has queues inside it and >>> the client should ack each message. >>> >>> > Is receiving and processing each message reliably important or is it >>> ok to drop messages when things fail? >>> A: Reliable is important, no messages should be lost. >>> >>> > Is there a message acknowledgement system or can you request a >>> position within the message stream (e.g. send all messages from position X >>> when connecting and if for whatever reason you need to reconnect you can >>> say send messages from position X to replay past messages)? >>> A: Client should ack each message it received, and the server will >>> delete the acked message. If the client broked and the server do not >>> receive an ack, the server will re-send the message to the client while it >>> online again. And there is no "position" concept like kafka. >>> >>> >>> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> Before getting into what you could use and the current state of >>>> SplittableDoFn and its supported features, I was wondering what reliability >>>> guarantees does the socket server have around messages? >>>> >>>> Is receiving and processing each message reliably important or is it ok >>>> to drop messages when things fail? >>>> Is there a message acknowledgement system or can you request a position >>>> within the message stream (e.g. send all messages from position X when >>>> connecting and if for whatever reason you need to reconnect you can say >>>> send messages from position X to replay past messages)? >>>> >>>> >>>> >>>> >>>> On Tue, Sep 18, 2018 at 5:00 PM flyisland <fly.isl...@gmail.com> wrote: >>>> >>>>> >>>>> Hi Gurus, >>>>> >>>>> I'm trying to create an IO connector to fetch data from a socket >>>>> server from Beam, I'm new to Beam, but according to this blog < >>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it >>>>> seems that SDF is the recommended way to implement an IO connector now. >>>>> >>>>> This in-house built socket server could accept multiple clients, but >>>>> only send messages to the first-connected client, and will send messages >>>>> to >>>>> the second client if the first one disconnected. >>>>> >>>>> To understand the lifecycle of a DoFn, I've just created a very simple >>>>> DoFn subclass, call log.debug() in every method, and according to the >>>>> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient >>>>> in-memory resources, such as network connections. The resources can then >>>>> be >>>>> disposed in DoFn.Teardown." I guess I should create the connection to the >>>>> socket server in the setup() method. >>>>> >>>>> But based on the log messages below, even the input PCollection has >>>>> only one element, Beam will still create more multiple DemoIO instances >>>>> and >>>>> invoked a different DemoIO instance after every checkpoint. >>>>> >>>>> I'm wondering: >>>>> 1. How could I let Beam create only one DemoIO instance, or at least >>>>> use the same instance constantly? >>>>> 2. Or should I use the Source API for such purpose? >>>>> >>>>> Thanks in advance. >>>>> >>>>> Logs: >>>>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@60a58077->setup() is called! >>>>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO - >>>>> First->getInitialRestriction() is called! >>>>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@417eede1->setup() is called! >>>>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called! >>>>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called! >>>>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0, >>>>> 9223372036854775807)->newTracker() is called! >>>>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0, >>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) >>>>> is >>>>> called! >>>>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0, >>>>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end! >>>>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called! >>>>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called! >>>>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>>> 2018-09-18T23:15:56.285Z -> 0 -> First >>>>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called! >>>>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>>> 2018-09-18T23:15:56.786Z -> 1 -> First >>>>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2, >>>>> 9223372036854775807)->newTracker() is called! >>>>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2, >>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) >>>>> is >>>>> called! >>>>> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2, >>>>> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end! >>>>> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called! >>>>> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>>> 2018-09-18T23:15:57.354Z -> 2 -> First >>>>> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@142109e->setup() is called! >>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>>> 2018-09-18T23:15:57.856Z -> 3 -> First >>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@142109e->startBundle() is called! >>>>> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - >>>>> 2018-09-18T23:15:58.358Z -> 4 -> First >>>>> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5, >>>>> 9223372036854775807)->newTracker() is called! >>>>> 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5, >>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) >>>>> is >>>>> called! >>>>> 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5, >>>>> 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end! >>>>> 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO - >>>>> org.apache.beam.examples.DemoIO@142109e->finishBundle() is called! >>>>> >>>>> WindowedWordCountSDF.java >>>>> >>>>> Pipeline pipeline = Pipeline.create(options); >>>>> List<String> LINES = Arrays.asList("First"); >>>>> PCollection<String> input = >>>>> pipeline >>>>> .apply(Create.of(LINES)) >>>>> .apply(ParDo.of(new DemoIO())); >>>>> ... >>>>> >>>>> >>>>> DemoIO.java >>>>> >>>>> public class DemoIO extends DoFn<String, String> { >>>>> private static final Logger LOG = >>>>> LoggerFactory.getLogger(DemoIO.class); >>>>> >>>>> public DemoIO(){ >>>>> super(); >>>>> LOG.debug("{}->new DemoIO() is called!", this); >>>>> } >>>>> >>>>> @ProcessElement >>>>> public void process(ProcessContext c, OffsetRangeTracker tracker) { >>>>> LOG.debug("{}->process({}) is called!", this, tracker); >>>>> >>>>> for (long i = tracker.currentRestriction().getFrom(); >>>>> tracker.tryClaim(i); ++i) { >>>>> sleep(500); >>>>> c.outputWithTimestamp(i + " -> " + c.element(), >>>>> Instant.now()); >>>>> } >>>>> LOG.debug("{}->process({}) end!", this, tracker); >>>>> } >>>>> >>>>> @GetInitialRestriction >>>>> public OffsetRange getInitialRestriction(String input) { >>>>> LOG.debug("{}->getInitialRestriction() is called!", input); >>>>> return new OffsetRange(0, Long.MAX_VALUE); >>>>> // return new OffsetRange(0, 100); >>>>> } >>>>> >>>>> @NewTracker >>>>> public OffsetRangeTracker newTracker(OffsetRange range) { >>>>> LOG.debug("{}->newTracker() is called!", range); >>>>> return new OffsetRangeTracker(range); >>>>> } >>>>> >>>>> @Setup >>>>> public void setup(){ >>>>> LOG.debug("{}->setup() is called!", this); >>>>> } >>>>> >>>>> @StartBundle >>>>> public void startBundle(){ >>>>> LOG.debug("{}->startBundle() is called!", this); >>>>> } >>>>> >>>>> >>>>> >>>>>