[ 
https://issues.apache.org/jira/browse/BEAM-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717597#comment-16717597
 ] 

Maximilian Michels commented on BEAM-4520:
------------------------------------------

We should at least WARN or throw an exception if the source needs checkpointing 
and we are not configured to run with checkpointing.

> No messages delivered after a while with PubsubIO
> -------------------------------------------------
>
>                 Key: BEAM-4520
>                 URL: https://issues.apache.org/jira/browse/BEAM-4520
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp, runner-flink
>    Affects Versions: 2.4.0
>            Reporter: Hrish
>            Assignee: Chamikara Jayalath
>            Priority: Major
>
> I am running the following Beam pipeline code locally, with the FlinkRunner. 
> PubsubIO is used to read messages from a topic. I have a separate thread that 
> publishes messages to the topic at regular intervals (every 30 seconds) and 
> also sets the "ts" attribute which is used later to derive the event time.
> Custom transform to convert to KV pair -
> {code:java}
> private static class PubSubMessageGrouper extends DoFn<PubsubMessage, 
> KV<String, PubsubMessage>> {
>     @ProcessElement
>     public void processElement(ProcessContext c) {
>         PubsubMessage element = c.element();
>         KV<String, PubsubMessage> kv = KV.of(element.getAttribute("key"), 
> element);
>         c.output(kv);
>     }
> }
> {code}
> Note that "key" is a key set in the message attributes earlier in the 
> publisher thread. The intent is to group the messages downstream by this key.
> Pipeline code -
> {code:java}
> PCollection<PubsubMessage> pubsubColl = p
>         .apply(PubsubIO.readMessagesWithAttributes()
>             .withTimestampAttribute("ts")
>             .fromTopic("projects/" + projectName + "/topics/beamtest")
>         );
> PCollection<KV<String, PubsubMessage>> idfied =
>         pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));
> PCollection<KV<String, PubsubMessage>> windowed = idfied
>         .apply(Window.<KV<String, 
> PubsubMessage>>into(FixedWindows.of(Duration.standardSeconds(15)))
>             .triggering(
>                 Repeatedly.forever(
>                     AfterWatermark.pastEndOfWindow()
>                 )
>             )
>             .withAllowedLateness(Duration.standardSeconds(15))
>             .discardingFiredPanes());
> PCollection<KV<String, Iterable<PubsubMessage>>> grouped = 
> windowed.apply(GroupByKey.create());
> grouped.apply(ParDo.of(new KVPrinter()));
> {code}
> The transforms are not chained for ease of reading. The KVPrinter transform 
> in the end is just to print out the messages received from the group by, 
> which will be subsequently replaced by actual code once I get this running. 
> When I run this, I don't find the trigger executing for quite some time (a 
> couple of minutes or longer). When it finally triggers, I see that some of 
> the messages are not received (in the final step), not matter how long I keep 
> it running. The Pubsub statistics in my GCP/Stackdriver dashboard show that 
> there is a backlog of undelivered messages.
> Is this due to the internal watermark that PubsubIO uses? My intention here 
> is to make sure that all messages are processed in the groupby, including 
> late ones within the allowed lateness window.
> Note that if I remove the GroupByKey, and just print the messages after the 
> windowing, I can see all the messages.
> --------
> An update:  I switched the .fromTopic to a .fromSubscription, with a 
> pre-created subscription, and things have started working, as in, I can see 
> messages being delivered now. The only difference I can see is that the 
> automatically created subscription from within PubsubIO has an ack deadline 
> of 60 seconds, whereas I created mine with 600 seconds (the max allowed by 
> pubsub).
> However, there is another issue now - messages don't seem to be getting 
> acked. I keep getting redeliveries of old messages, and the Stackdriver 
> metrics show that the num_undelivered_messages (unacked messages count) keeps 
> on increasing. The documentation says that messages will be acked as soon as 
> a GroupByKey or another ParDo happens, and that's happening since I am seeing 
> the windowed groupby-s, but acks don't seem to be happening.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to