This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ebf2aac Merge pull request #15537 from [BEAM-12908] Add a sleep to
the IT after topic creation to wait for propagation
ebf2aac is described below
commit ebf2aacf37b97fc85b167271f184f61f5b06ddc3
Author: dpcollins-google <[email protected]>
AuthorDate: Tue Sep 21 23:22:35 2021 -0400
Merge pull request #15537 from [BEAM-12908] Add a sleep to the IT after
topic creation to wait for propagation
* [BEAM-12908] Add a 10 second sleep to the IT after topic creation to wait
for propagation
* [BEAM-12908] Add a 10 second sleep to the IT after topic creation to wait
for propagation
---
.../apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
index e242942..d6e9f8b 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
@@ -236,7 +236,21 @@ public class ReadWriteIT {
pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
TopicPath topic = createTopic(getProject(pipeline.getOptions()));
- SubscriptionPath subscription = createSubscription(topic);
+ SubscriptionPath subscription = null;
+ Exception lastException = null;
+ for (int i = 0; i < 30; ++i) {
+ // Sleep for topic creation to propagate.
+ Thread.sleep(1000);
+ try {
+ subscription = createSubscription(topic);
+ } catch (Exception e) {
+ lastException = e;
+ LOG.info("Retrying exception on subscription creation.", e);
+ }
+ }
+ if (subscription == null) {
+ throw lastException;
+ }
// Publish some messages
writeMessages(topic, pipeline);