[ https://issues.apache.org/jira/browse/BEAM-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717597#comment-16717597 ]
Maximilian Michels commented on BEAM-4520: ------------------------------------------ We should at least WARN or throw an exception if the source needs checkpointing and we are not configured to run with checkpointing. > No messages delivered after a while with PubsubIO > ------------------------------------------------- > > Key: BEAM-4520 > URL: https://issues.apache.org/jira/browse/BEAM-4520 > Project: Beam > Issue Type: Bug > Components: io-java-gcp, runner-flink > Affects Versions: 2.4.0 > Reporter: Hrish > Assignee: Chamikara Jayalath > Priority: Major > > I am running the following Beam pipeline code locally, with the FlinkRunner. > PubsubIO is used to read messages from a topic. I have a separate thread that > publishes messages to the topic at regular intervals (every 30 seconds) and > also sets the "ts" attribute which is used later to derive the event time. > Custom transform to convert to KV pair - > {code:java} > private static class PubSubMessageGrouper extends DoFn<PubsubMessage, > KV<String, PubsubMessage>> { > @ProcessElement > public void processElement(ProcessContext c) { > PubsubMessage element = c.element(); > KV<String, PubsubMessage> kv = KV.of(element.getAttribute("key"), > element); > c.output(kv); > } > } > {code} > Note that "key" is a key set in the message attributes earlier in the > publisher thread. The intent is to group the messages downstream by this key. > Pipeline code - > {code:java} > PCollection<PubsubMessage> pubsubColl = p > .apply(PubsubIO.readMessagesWithAttributes() > .withTimestampAttribute("ts") > .fromTopic("projects/" + projectName + "/topics/beamtest") > ); > PCollection<KV<String, PubsubMessage>> idfied = > pubsubColl.apply(ParDo.of(new PubSubMessageGrouper())); > PCollection<KV<String, PubsubMessage>> windowed = idfied > .apply(Window.<KV<String, > PubsubMessage>>into(FixedWindows.of(Duration.standardSeconds(15))) > .triggering( > Repeatedly.forever( > AfterWatermark.pastEndOfWindow() > ) > ) > .withAllowedLateness(Duration.standardSeconds(15)) > .discardingFiredPanes()); > PCollection<KV<String, Iterable<PubsubMessage>>> grouped = > windowed.apply(GroupByKey.create()); > grouped.apply(ParDo.of(new KVPrinter())); > {code} > The transforms are not chained for ease of reading. The KVPrinter transform > in the end is just to print out the messages received from the group by, > which will be subsequently replaced by actual code once I get this running. > When I run this, I don't find the trigger executing for quite some time (a > couple of minutes or longer). When it finally triggers, I see that some of > the messages are not received (in the final step), not matter how long I keep > it running. The Pubsub statistics in my GCP/Stackdriver dashboard show that > there is a backlog of undelivered messages. > Is this due to the internal watermark that PubsubIO uses? My intention here > is to make sure that all messages are processed in the groupby, including > late ones within the allowed lateness window. > Note that if I remove the GroupByKey, and just print the messages after the > windowing, I can see all the messages. > -------- > An update: I switched the .fromTopic to a .fromSubscription, with a > pre-created subscription, and things have started working, as in, I can see > messages being delivered now. The only difference I can see is that the > automatically created subscription from within PubsubIO has an ack deadline > of 60 seconds, whereas I created mine with 600 seconds (the max allowed by > pubsub). > However, there is another issue now - messages don't seem to be getting > acked. I keep getting redeliveries of old messages, and the Stackdriver > metrics show that the num_undelivered_messages (unacked messages count) keeps > on increasing. The documentation says that messages will be acked as soon as > a GroupByKey or another ParDo happens, and that's happening since I am seeing > the windowed groupby-s, but acks don't seem to be happening. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)