[
https://issues.apache.org/jira/browse/BEAM-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Maximilian Michels reassigned BEAM-4520:
----------------------------------------
Assignee: Maximilian Michels
> No messages delivered after a while with PubsubIO
> -------------------------------------------------
>
> Key: BEAM-4520
> URL: https://issues.apache.org/jira/browse/BEAM-4520
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp, runner-flink
> Affects Versions: 2.4.0
> Reporter: Hrish
> Assignee: Maximilian Michels
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> I am running the following Beam pipeline code locally, with the FlinkRunner.
> PubsubIO is used to read messages from a topic. I have a separate thread that
> publishes messages to the topic at regular intervals (every 30 seconds) and
> also sets the "ts" attribute which is used later to derive the event time.
> Custom transform to convert to KV pair -
> {code:java}
> private static class PubSubMessageGrouper extends DoFn<PubsubMessage,
> KV<String, PubsubMessage>> {
> @ProcessElement
> public void processElement(ProcessContext c) {
> PubsubMessage element = c.element();
> KV<String, PubsubMessage> kv = KV.of(element.getAttribute("key"),
> element);
> c.output(kv);
> }
> }
> {code}
> Note that "key" is a key set in the message attributes earlier in the
> publisher thread. The intent is to group the messages downstream by this key.
> Pipeline code -
> {code:java}
> PCollection<PubsubMessage> pubsubColl = p
> .apply(PubsubIO.readMessagesWithAttributes()
> .withTimestampAttribute("ts")
> .fromTopic("projects/" + projectName + "/topics/beamtest")
> );
> PCollection<KV<String, PubsubMessage>> idfied =
> pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));
> PCollection<KV<String, PubsubMessage>> windowed = idfied
> .apply(Window.<KV<String,
> PubsubMessage>>into(FixedWindows.of(Duration.standardSeconds(15)))
> .triggering(
> Repeatedly.forever(
> AfterWatermark.pastEndOfWindow()
> )
> )
> .withAllowedLateness(Duration.standardSeconds(15))
> .discardingFiredPanes());
> PCollection<KV<String, Iterable<PubsubMessage>>> grouped =
> windowed.apply(GroupByKey.create());
> grouped.apply(ParDo.of(new KVPrinter()));
> {code}
> The transforms are not chained for ease of reading. The KVPrinter transform
> in the end is just to print out the messages received from the group by,
> which will be subsequently replaced by actual code once I get this running.
> When I run this, I don't find the trigger executing for quite some time (a
> couple of minutes or longer). When it finally triggers, I see that some of
> the messages are not received (in the final step), not matter how long I keep
> it running. The Pubsub statistics in my GCP/Stackdriver dashboard show that
> there is a backlog of undelivered messages.
> Is this due to the internal watermark that PubsubIO uses? My intention here
> is to make sure that all messages are processed in the groupby, including
> late ones within the allowed lateness window.
> Note that if I remove the GroupByKey, and just print the messages after the
> windowing, I can see all the messages.
> --------
> An update: I switched the .fromTopic to a .fromSubscription, with a
> pre-created subscription, and things have started working, as in, I can see
> messages being delivered now. The only difference I can see is that the
> automatically created subscription from within PubsubIO has an ack deadline
> of 60 seconds, whereas I created mine with 600 seconds (the max allowed by
> pubsub).
> However, there is another issue now - messages don't seem to be getting
> acked. I keep getting redeliveries of old messages, and the Stackdriver
> metrics show that the num_undelivered_messages (unacked messages count) keeps
> on increasing. The documentation says that messages will be acked as soon as
> a GroupByKey or another ParDo happens, and that's happening since I am seeing
> the windowed groupby-s, but acks don't seem to be happening.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)