This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 891349cae44 added a simple pub/sub topic for testing (#27610)
891349cae44 is described below
commit 891349cae44c27014d7d09cb59a2d719d8198b54
Author: liferoad <[email protected]>
AuthorDate: Tue Jul 25 16:11:19 2023 -0400
added a simple pub/sub topic for testing (#27610)
* added a simple pub/sub topic for testing
* update the code based on comments
---------
Co-authored-by: xqhu <[email protected]>
---
.test-infra/pubsub/README.md | 37 +++++++++++++++++
.test-infra/pubsub/gcs_image_looper.py | 69 +++++++++++++++++++++++++++++++
.test-infra/pubsub/test_image_looper.py | 72 +++++++++++++++++++++++++++++++++
3 files changed, 178 insertions(+)
diff --git a/.test-infra/pubsub/README.md b/.test-infra/pubsub/README.md
new file mode 100644
index 00000000000..509c1006e00
--- /dev/null
+++ b/.test-infra/pubsub/README.md
@@ -0,0 +1,37 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Overview
+
+This folder contains Python scripts to create a Pub/Sub topic under
+the GCP project `apache-beam-testing` and test the topic.
+The created topic is
`projects/apache-beam-testing/topics/Imagenet_openimage_50k_benchmark`.
+
+# Create the topic `Imagenet_openimage_50k_benchmark`
+
+- Create one VM to run `gcs_image_looper.py`.
+ The VM `pubsub-test-do-not-delete` was already created under
`apache-beam-testing`.
+ Keep the script running to continuously publish data.
+- You might run `gcloud auth application-default login` to get the auth.
+- You might run `pip install google-cloud-core google-cloud-pubsub
google-cloud-storage`.
+- Must make `Imagenet_openimage_50k_benchmark` public by adding
`allAuthenticatedUsers` to the Pub/Sub Subscriber role.
+
+# Tes the topic by subscribing it
+
+- Run `test_image_looper.py` to check whether you could get any data.
diff --git a/.test-infra/pubsub/gcs_image_looper.py
b/.test-infra/pubsub/gcs_image_looper.py
new file mode 100644
index 00000000000..a00e36f9ae7
--- /dev/null
+++ b/.test-infra/pubsub/gcs_image_looper.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""This executable loops image filepaths from a gcs bucket file."""
+import random
+import time
+
+from google.api_core.exceptions import AlreadyExists
+from google.cloud import pubsub_v1
+from google.cloud import storage
+
+# use the eou project and gcs to run the word looper
+project_id = "apache-beam-testing"
+gcs_bucket = "apache-beam-ml"
+num_images_per_second = 5
+
+publisher = pubsub_v1.PublisherClient()
+image_file_path = "testing/inputs/openimage_50k_benchmark.txt"
+topic_name = "Imagenet_openimage_50k_benchmark"
+topic_path = publisher.topic_path(project_id, topic_name)
+
+
+class ImageLooper(object):
+ """Loop the images in a gcs bucket file and publish them to a pubsub topic.
+ """
+ content = ""
+ cursor = 0
+
+ def __init__(self, filename):
+ self._read_gcs_file(filename)
+
+ def get_next_image(self):
+ """Returns the next image randomly."""
+ next_image = ""
+ while not next_image:
+ image_id = random.randint(0, len(self.content) - 1)
+ next_image = self.content[image_id]
+ return next_image
+
+ def _read_gcs_file(self, filename):
+ client = storage.Client()
+ bucket = client.get_bucket(gcs_bucket)
+ blob = bucket.get_blob(filename)
+ self.content = blob.download_as_string().decode("utf-8").split('\n')
+
+
+try:
+ publisher.create_topic(request={"name": topic_path})
+except AlreadyExists:
+ pass
+
+looper = ImageLooper(image_file_path)
+while True:
+ image = looper.get_next_image()
+ publisher.publish(topic_path, data=image.encode("utf-8"))
+ time.sleep(1 / num_images_per_second)
diff --git a/.test-infra/pubsub/test_image_looper.py
b/.test-infra/pubsub/test_image_looper.py
new file mode 100644
index 00000000000..c5fcf9f652c
--- /dev/null
+++ b/.test-infra/pubsub/test_image_looper.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""This executable test the pub/sub topic created by gcs_image_looper.py"""
+
+from concurrent.futures import TimeoutError
+
+from google.cloud import pubsub_v1
+from google.api_core.exceptions import AlreadyExists
+
+project_id = "apache-beam-testing"
+subscription_id = "test-image-looper"
+topic_id = "Imagenet_openimage_50k_benchmark"
+
+publisher = pubsub_v1.PublisherClient()
+subscriber = pubsub_v1.SubscriberClient()
+topic_path = publisher.topic_path(project_id, topic_id)
+subscription_path = subscriber.subscription_path(project_id, subscription_id)
+
+try:
+ subscription = subscriber.create_subscription(request={
+ "name": subscription_path,
+ "topic": topic_path
+ })
+ print(f"Subscription created: {subscription}")
+except AlreadyExists:
+ subscriber.delete_subscription(request={"subscription": subscription_path})
+ subscription = subscriber.create_subscription(request={
+ "name": subscription_path,
+ "topic": topic_path
+ })
+ print(f"Subscription recreated: {subscription}")
+
+timeout = 3.0
+
+total_images = []
+
+
+def callback(message: pubsub_v1.subscriber.message.Message) -> None:
+ total_images.append(message.data.decode())
+ message.ack()
+
+
+streaming_pull_future = subscriber.subscribe(subscription_path,
+ callback=callback)
+print(f"Listening for messages on {subscription_path}..\n")
+
+try:
+ # When `timeout` is not set, result() will block indefinitely,
+ # unless an exception is encountered first.
+ streaming_pull_future.result(timeout=timeout)
+except TimeoutError:
+ streaming_pull_future.cancel() # Trigger the shutdown.
+ streaming_pull_future.result() # Block until the shutdown is complete.
+print("Results: \n", total_images)
+
+subscriber.delete_subscription(request={"subscription": subscription_path})
+
+print(f"Subscription deleted: {subscription_path}.")
\ No newline at end of file