dpcollins-google commented on a change in pull request #15727:
URL: https://github.com/apache/beam/pull/15727#discussion_r733959939



##########
File path: sdks/python/setup.py
##########
@@ -188,6 +188,7 @@ def get_version():
     'google-auth>=1.18.0,<3',
     'google-cloud-datastore>=1.8.0,<2',
     'google-cloud-pubsub>=0.39.0,<2',
+    'google-cloud-pubsublite>=1.2.0,<2',

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py
##########
@@ -0,0 +1,79 @@
+from apache_beam.transforms import Map, PTransform
+from apache_beam.io.gcp.pubsublite.external import ReadExternal, WriteExternal
+
+try:
+  from google.cloud import pubsublite
+except ImportError:
+  pubsublite = None
+
+
+class ReadFromPubSubLite(PTransform):
+  """A ``PTransform`` for reading from Pub/Sub Lite."""

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py
##########
@@ -0,0 +1,79 @@
+from apache_beam.transforms import Map, PTransform
+from apache_beam.io.gcp.pubsublite.external import ReadExternal, WriteExternal
+
+try:
+  from google.cloud import pubsublite
+except ImportError:
+  pubsublite = None
+
+
+class ReadFromPubSubLite(PTransform):
+  """A ``PTransform`` for reading from Pub/Sub Lite."""
+
+  def __init__(
+      self,
+      subscription_path,
+      min_bundle_timeout=None,
+      deduplicate=None
+  ):
+    """Initializes ``ReadFromPubSubLite``.
+
+    Args:
+      subscription_path: Pub/Sub Lite Subscription in the form
+          
"projects/<project>/locations/<location>/subscriptions/<subscription>".
+      min_bundle_timeout: The minimum wall time to pass before allowing
+          bundle closure. Setting this to too small of a value will result in
+          increased compute costs and lower throughput per byte. Immediate
+          timeouts (0) may be useful for testing.
+      deduplicate: Whether to deduplicate messages based on the value of
+          the 'x-goog-pubsublite-dataflow-uuid' attribute. Defaults to False.
+    """
+    super().__init__()
+    self._source = ReadExternal(
+      subscription_path=subscription_path,
+      min_bundle_timeout=min_bundle_timeout,
+      deduplicate=deduplicate
+    )
+
+  def expand(self, pvalue):
+    pcoll = pvalue.pipeline | self._source
+    pcoll.element_type = bytes
+    pcoll = pcoll | Map(pubsublite.SequencedMessage.deserialize)
+    pcoll.element_type = pubsublite.SequencedMessage
+    return pcoll
+
+
+class WriteToPubSubLite(PTransform):
+  """A ``PTransform`` for writing to Pub/Sub Lite."""

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py
##########
@@ -0,0 +1,79 @@
+from apache_beam.transforms import Map, PTransform
+from apache_beam.io.gcp.pubsublite.external import ReadExternal, WriteExternal
+
+try:
+  from google.cloud import pubsublite
+except ImportError:
+  pubsublite = None
+
+
+class ReadFromPubSubLite(PTransform):

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/io/gcp/pubsublite/external.py
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Google Pub/Sub Lite sources and sinks.
+
+This API is currently under development and is subject to change.
+"""
+
+# pytype: skip-file
+
+import typing
+
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+_ReadSchema = typing.NamedTuple(
+    '_ReadSchema',
+    [('subscription_path', str),
+     ('min_bundle_timeout', int),
+     ('deduplicate', bool)])
+
+
+def _default_io_expansion_service():
+  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+
+
+class ReadExternal(ExternalTransform):
+  """
+    An external PTransform which reads from Pub/Sub Lite and returns a
+    SequencedMessage as serialized bytes.
+
+    Experimental; no backwards compatibility guarantees.
+  """
+
+  def __init__(
+      self,
+      subscription_path,
+      min_bundle_timeout=None,
+      deduplicate=None
+  ):
+    """
+    Initializes a read operation from Pub/Sub Lite.
+
+    Args:
+      subscription_path: A Pub/Sub Lite Subscription path.
+      min_bundle_timeout: The minimum wall time to pass before allowing
+          bundle closure. Setting this to too small of a value will result in
+          increased compute costs and lower throughput per byte. Immediate
+          timeouts (0) may be useful for testing.
+      deduplicate: Whether to deduplicate messages based on the value of
+          the 'x-goog-pubsublite-dataflow-uuid' attribute.
+    """
+    if min_bundle_timeout is None:
+      min_bundle_timeout = 60 * 1000
+    if deduplicate is None:
+      deduplicate = False
+    super().__init__(
+      'beam:external:java:pubsublite:read:v1',
+      NamedTupleBasedPayloadBuilder(
+        _ReadSchema(
+          subscription_path=subscription_path,
+          min_bundle_timeout=min_bundle_timeout,
+          deduplicate=deduplicate)),
+      _default_io_expansion_service())
+
+
+_WriteSchema = typing.NamedTuple(
+    '_WriteSchema',
+    [
+        ('topic_path', str),
+        ('add_uuids', bool)
+    ])
+
+
+class WriteExternal(ExternalTransform):
+  """
+    An external PTransform which writes serialized PubSubMessage protos to
+    Pub/Sub Lite.
+
+    Experimental; no backwards compatibility guarantees.
+  """
+  def __init__(
+      self,
+      topic_path,
+      add_uuids=None
+  ):
+    """
+    Initializes a write operation to Pub/Sub Lite.
+
+    Args:
+      topic_path: A Pub/Sub Lite Topic path.
+      add_uuids: Whether to add uuids to the 'x-goog-pubsublite-dataflow-uuid'
+          uuid attribute.
+    """
+    if add_uuids is None:
+      add_uuids = False
+    super().__init__(
+        'beam:external:java:pubsublite:write:v1',
+        NamedTupleBasedPayloadBuilder(
+            _WriteSchema(
+                topic_path=topic_path,
+                add_uuids=add_uuids,
+            )),
+        _default_io_expansion_service())

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/io/gcp/pubsublite/external.py
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Google Pub/Sub Lite sources and sinks.
+
+This API is currently under development and is subject to change.
+"""
+
+# pytype: skip-file
+
+import typing
+
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+_ReadSchema = typing.NamedTuple(
+    '_ReadSchema',
+    [('subscription_path', str),
+     ('min_bundle_timeout', int),
+     ('deduplicate', bool)])
+
+
+def _default_io_expansion_service():
+  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+
+
+class ReadExternal(ExternalTransform):
+  """
+    An external PTransform which reads from Pub/Sub Lite and returns a
+    SequencedMessage as serialized bytes.
+
+    Experimental; no backwards compatibility guarantees.
+  """
+
+  def __init__(
+      self,
+      subscription_path,
+      min_bundle_timeout=None,
+      deduplicate=None
+  ):
+    """
+    Initializes a read operation from Pub/Sub Lite.
+
+    Args:
+      subscription_path: A Pub/Sub Lite Subscription path.
+      min_bundle_timeout: The minimum wall time to pass before allowing
+          bundle closure. Setting this to too small of a value will result in
+          increased compute costs and lower throughput per byte. Immediate
+          timeouts (0) may be useful for testing.
+      deduplicate: Whether to deduplicate messages based on the value of
+          the 'x-goog-pubsublite-dataflow-uuid' attribute.
+    """
+    if min_bundle_timeout is None:
+      min_bundle_timeout = 60 * 1000
+    if deduplicate is None:
+      deduplicate = False
+    super().__init__(
+      'beam:external:java:pubsublite:read:v1',
+      NamedTupleBasedPayloadBuilder(
+        _ReadSchema(
+          subscription_path=subscription_path,
+          min_bundle_timeout=min_bundle_timeout,
+          deduplicate=deduplicate)),
+      _default_io_expansion_service())

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/io/gcp/pubsublite/external.py
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Google Pub/Sub Lite sources and sinks.
+
+This API is currently under development and is subject to change.
+"""
+
+# pytype: skip-file
+
+import typing
+
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+_ReadSchema = typing.NamedTuple(
+    '_ReadSchema',
+    [('subscription_path', str),
+     ('min_bundle_timeout', int),
+     ('deduplicate', bool)])
+
+
+def _default_io_expansion_service():
+  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/io/gcp/pubsublite/external.py
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Google Pub/Sub Lite sources and sinks.
+
+This API is currently under development and is subject to change.

Review comment:
       Done.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/ExternalTransformRegistrarImpl.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import 
org.apache.beam.sdk.io.gcp.pubsublite.internal.ExternalTransformConfig.ReadExternalBuilder;
+import 
org.apache.beam.sdk.io.gcp.pubsublite.internal.ExternalTransformConfig.WriteExternalBuilder;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@AutoService(ExternalTransformRegistrar.class)
+public class ExternalTransformRegistrarImpl implements 
ExternalTransformRegistrar {
+  public static final String WRITE_URN = 
"beam:external:java:pubsublite:write:v1";

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to