Before getting into what you could use and the current state of
SplittableDoFn and its supported features, I was wondering what reliability
guarantees does the socket server have around messages?

Is receiving and processing each message reliably important or is it ok to
drop messages when things fail?
Is there a message acknowledgement system or can you request a position
within the message stream (e.g. send all messages from position X when
connecting and if for whatever reason you need to reconnect you can say
send messages from position X to replay past messages)?




On Tue, Sep 18, 2018 at 5:00 PM flyisland <[email protected]> wrote:

>
> Hi Gurus,
>
> I'm trying to create an IO connector to fetch data from a socket server
> from Beam, I'm new to Beam, but according to this blog <
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it seems
> that SDF is the recommended way to implement an IO connector now.
>
> This in-house built socket server could accept multiple clients, but only
> send messages to the first-connected client, and will send messages to the
> second client if the first one disconnected.
>
> To understand the lifecycle of a DoFn, I've just created a very simple
> DoFn subclass, call log.debug() in every method, and according to the
> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
> in-memory resources, such as network connections. The resources can then be
> disposed in DoFn.Teardown." I guess I should create the connection to the
> socket server in the setup() method.
>
> But based on the log messages below, even the input PCollection has only
> one element, Beam will still create more multiple DemoIO instances and
> invoked a different DemoIO instance after every checkpoint.
>
> I'm wondering:
> 1. How could I let Beam create only one DemoIO instance, or at least use
> the same instance constantly?
> 2. Or should I use the Source API for such purpose?
>
> Thanks in advance.
>
> Logs:
> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
> First->getInitialRestriction() is called!
> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
> 9223372036854775807)->newTracker() is called!
> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
> called!
> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:56.285Z -> 0 -> First
> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:56.786Z -> 1 -> First
> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
> 9223372036854775807)->newTracker() is called!
> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
> called!
> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:57.354Z -> 2 -> First
> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@142109e->setup() is called!
> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:57.856Z -> 3 -> First
> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:58.358Z -> 4 -> First
> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
> 9223372036854775807)->newTracker() is called!
> 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
> called!
> 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
> 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end!
> 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@142109e->finishBundle() is called!
>
> WindowedWordCountSDF.java
>
> Pipeline pipeline = Pipeline.create(options);
> List<String> LINES = Arrays.asList("First");
> PCollection<String> input =
>     pipeline
>             .apply(Create.of(LINES))
>             .apply(ParDo.of(new DemoIO()));
> ...
>
>
> DemoIO.java
>
> public class DemoIO extends DoFn<String, String> {
>     private static final Logger LOG = LoggerFactory.getLogger(DemoIO.class);
>
>     public DemoIO(){
>         super();
>         LOG.debug("{}->new DemoIO() is called!", this);
>     }
>
>     @ProcessElement
>     public void process(ProcessContext c, OffsetRangeTracker tracker) {
>         LOG.debug("{}->process({}) is called!", this, tracker);
>
>         for (long i = tracker.currentRestriction().getFrom(); 
> tracker.tryClaim(i); ++i) {
>             sleep(500);
>             c.outputWithTimestamp(i + " -> " + c.element(), Instant.now());
>         }
>         LOG.debug("{}->process({}) end!", this, tracker);
>     }
>
>     @GetInitialRestriction
>     public OffsetRange getInitialRestriction(String input) {
>         LOG.debug("{}->getInitialRestriction() is called!", input);
>         return new OffsetRange(0, Long.MAX_VALUE);
> //        return new OffsetRange(0, 100);
>     }
>
>     @NewTracker
>     public OffsetRangeTracker newTracker(OffsetRange range) {
>         LOG.debug("{}->newTracker() is called!", range);
>         return new OffsetRangeTracker(range);
>     }
>
>     @Setup
>     public void setup(){
>         LOG.debug("{}->setup() is called!", this);
>     }
>
>     @StartBundle
>     public void startBundle(){
>         LOG.debug("{}->startBundle() is called!", this);
>     }
>
>
>
>

Reply via email to