dpcollins-google commented on a change in pull request #15727:
URL: https://github.com/apache/beam/pull/15727#discussion_r733951398
##########
File path: sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py
##########
@@ -0,0 +1,79 @@
+from apache_beam.transforms import Map, PTransform
+from apache_beam.io.gcp.pubsublite.external import ReadExternal, WriteExternal
+
+try:
+ from google.cloud import pubsublite
+except ImportError:
+ pubsublite = None
+
+
+class ReadFromPubSubLite(PTransform):
+ """A ``PTransform`` for reading from Pub/Sub Lite."""
+
+ def __init__(
+ self,
+ subscription_path,
+ min_bundle_timeout=None,
+ deduplicate=None
+ ):
+ """Initializes ``ReadFromPubSubLite``.
+
+ Args:
+ subscription_path: Pub/Sub Lite Subscription in the form
+
"projects/<project>/locations/<location>/subscriptions/<subscription>".
+ min_bundle_timeout: The minimum wall time to pass before allowing
+ bundle closure. Setting this to too small of a value will result in
+ increased compute costs and lower throughput per byte. Immediate
+ timeouts (0) may be useful for testing.
+ deduplicate: Whether to deduplicate messages based on the value of
+ the 'x-goog-pubsublite-dataflow-uuid' attribute. Defaults to False.
+ """
+ super().__init__()
+ self._source = ReadExternal(
Review comment:
Its easier to initialize in __init__, then you don't need to store the
variables. Is this okay?
##########
File path: sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py
##########
@@ -0,0 +1,79 @@
+from apache_beam.transforms import Map, PTransform
+from apache_beam.io.gcp.pubsublite.external import ReadExternal, WriteExternal
+
+try:
+ from google.cloud import pubsublite
+except ImportError:
+ pubsublite = None
+
+
+class ReadFromPubSubLite(PTransform):
+ """A ``PTransform`` for reading from Pub/Sub Lite."""
+
+ def __init__(
+ self,
+ subscription_path,
+ min_bundle_timeout=None,
+ deduplicate=None
+ ):
+ """Initializes ``ReadFromPubSubLite``.
+
+ Args:
+ subscription_path: Pub/Sub Lite Subscription in the form
+
"projects/<project>/locations/<location>/subscriptions/<subscription>".
+ min_bundle_timeout: The minimum wall time to pass before allowing
+ bundle closure. Setting this to too small of a value will result in
+ increased compute costs and lower throughput per byte. Immediate
+ timeouts (0) may be useful for testing.
+ deduplicate: Whether to deduplicate messages based on the value of
+ the 'x-goog-pubsublite-dataflow-uuid' attribute. Defaults to False.
+ """
+ super().__init__()
+ self._source = ReadExternal(
+ subscription_path=subscription_path,
+ min_bundle_timeout=min_bundle_timeout,
+ deduplicate=deduplicate
+ )
+
+ def expand(self, pvalue):
+ pcoll = pvalue.pipeline | self._source
+ pcoll.element_type = bytes
+ pcoll = pcoll | Map(pubsublite.SequencedMessage.deserialize)
+ pcoll.element_type = pubsublite.SequencedMessage
+ return pcoll
+
+
+class WriteToPubSubLite(PTransform):
+ """A ``PTransform`` for writing to Pub/Sub Lite."""
+
+ def __init__(
+ self,
+ topic_path,
+ add_uuids=None
+ ):
+ """Initializes ``WriteToPubSubLite``.
+
+ Args:
+ topic_path: A Pub/Sub Lite Topic path.
+ add_uuids: Whether to add uuids to the 'x-goog-pubsublite-dataflow-uuid'
+ uuid attribute. Defaults to False.
+ """
+ super().__init__()
+ self._source = WriteExternal(
Review comment:
Its easier to initialize in __init__, then you don't need to store the
variables. Is this okay?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]