damccorm commented on code in PR #29782:
URL: https://github.com/apache/beam/pull/29782#discussion_r1434488633


##########
sdks/python/apache_beam/transforms/enrichment_it_test.py:
##########
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+from typing import NamedTuple
+from typing import Tuple
+from typing import Union
+
+import pytest
+import urllib3
+
+import apache_beam as beam
+from apache_beam.io.requestresponse import UserCodeExecutionException
+from apache_beam.io.requestresponse import UserCodeQuotaException
+from apache_beam.io.requestresponse_it_test import _PAYLOAD
+from apache_beam.io.requestresponse_it_test import EchoITOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms.enrichment import Enrichment
+from apache_beam.transforms.enrichment import EnrichmentSourceHandler
+
+
+class Request(NamedTuple):
+  """Simple request type to store id and payload for requests."""
+  id: str  # mock API quota id
+  payload: bytes  # byte payload
+
+
+class SampleHTTPEnrichment(EnrichmentSourceHandler[Request, beam.Row]):
+  """Implements ``EnrichmentSourceHandler`` to call the ``EchoServiceGrpc``'s
+  HTTP handler.
+  """
+  def __init__(self, url: str):
+    self.url = url + '/v1/echo'  # append path to the mock API.
+
+  def __call__(self, request: Request, *args, **kwargs):
+    """Overrides ``Caller``'s call method invoking the
+    ``EchoServiceGrpc``'s HTTP handler with an ``EchoRequest``, returning
+    either a successful ``EchoResponse`` or throwing either a
+    ``UserCodeExecutionException``, ``UserCodeTimeoutException``,
+    or a ``UserCodeQuotaException``.
+    """
+    try:
+      resp = urllib3.request(
+          "POST",
+          self.url,
+          json={
+              "id": request.id, "payload": str(request.payload, 'utf-8')
+          },
+          retries=False)
+
+      if resp.status < 300:
+        resp_body = resp.json()
+        resp_id = resp_body['id']
+        payload = resp_body['payload']
+        yield (
+            beam.Row(id=request.id, payload=request.payload),
+            beam.Row(id=resp_id, resp_payload=bytes(payload, 'utf-8')))
+
+      if resp.status == 429:  # Too Many Requests
+        raise UserCodeQuotaException(resp.reason)
+      elif resp.status != 200:
+        raise UserCodeExecutionException(resp.status, resp.reason, request)
+
+    except urllib3.exceptions.HTTPError as e:
+      raise UserCodeExecutionException(e)
+
+
[email protected]_postcommit
+class TestEnrichment(unittest.TestCase):
+  options: Union[EchoITOptions, None] = None
+  client: Union[SampleHTTPEnrichment, None] = None
+
+  @classmethod
+  def setUpClass(cls) -> None:
+    cls.options = EchoITOptions()
+    http_endpoint_address = 'http://10.138.0.32:8080'
+    cls.client = SampleHTTPEnrichment(http_endpoint_address)
+
+  @classmethod
+  def _get_client_and_options(
+      cls) -> Tuple[SampleHTTPEnrichment, EchoITOptions]:
+    assert cls.options is not None
+    assert cls.client is not None
+    return cls.client, cls.options
+
+  def test_http_enrichment(self):
+    client, options = TestEnrichment._get_client_and_options()
+    req = Request(id=options.never_exceed_quota_id, payload=_PAYLOAD)
+    with TestPipeline(is_integration_test=True) as test_pipeline:
+      output = (
+          test_pipeline
+          | 'Create PCollection' >> beam.Create([req])
+          | 'Enrichment Transform' >> Enrichment(client))
+      self.assertIsNotNone(output)

Review Comment:
   I think we should have some more extensive testing here. Specifically:
   1) We should test that the join works (so this should validate the output 
pcollection)
   2) We should test a custom join
   
   I'd feel even better if we could validate some of the other behaviors (e.g. 
retry strategy) so that from an interface perspective we could swap out rrio 
and be confident everything works if needed, though that is not required if 
difficult



##########
sdks/python/apache_beam/transforms/enrichment.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Callable
+from typing import Generic
+from typing import Optional
+from typing import TypeVar
+
+import apache_beam as beam
+from apache_beam.io.requestresponse import DEFAULT_TIMEOUT_SECS
+from apache_beam.io.requestresponse import CacheReader
+from apache_beam.io.requestresponse import CacheWriter
+from apache_beam.io.requestresponse import Caller
+from apache_beam.io.requestresponse import PreCallThrottler
+from apache_beam.io.requestresponse import Repeater
+from apache_beam.io.requestresponse import RequestResponseIO
+from apache_beam.io.requestresponse import ShouldBackOff
+
+__all__ = [
+    "EnrichmentSourceHandler",
+    "Enrichment",
+]
+
+InputT = TypeVar('InputT')
+OutputT = TypeVar('OutputT')
+
+
+def cross_join(element):
+  """cross_join performs a cross join between two `beam.Row` objects.
+
+    Joins the columns of the right `beam.Row` onto the left `beam.Row`.
+
+    Args:
+      element (Tuple): A tuple containing two beam.Row objects -
+        request and response.
+
+    Returns:
+      beam.Row: `beam.Row` containing the merged columns.
+  """
+  right_dict = element[1].as_dict()
+  left_dict = element[0].as_dict()
+  for k, v in right_dict.items():
+    left_dict[k] = v
+  return beam.Row(**left_dict)
+
+
+class EnrichmentSourceHandler(Caller[InputT, OutputT]):
+  """Wrapper class for :class:`apache_beam.io.requestresponse.Caller`.
+
+  Ensure that the implementation of ``__call__`` method returns a tuple
+  of `beam.Row`  objects.
+  """
+
+  pass
+
+
+class Enrichment(beam.PTransform[beam.PCollection[InputT],
+                                 beam.PCollection[OutputT]],
+                 Generic[InputT, OutputT]):
+  """A :class:`apache_beam.transforms.enrichment.Enrichment` transform to
+  enrich elements in a PCollection.
+
+  Uses the :class:`apache_beam.transforms.enrichment.EnrichmentSourceHandler`
+  to enrich elements by joining the metadata from external source.
+
+  Processes an input :class:`~apache_beam.pvalue.PCollection` of `beam.Row` by
+  applying a :class:`apache_beam.transforms.enrichment.EnrichmentSourceHandler`
+  to each element and returning the enriched
+  :class:`~apache_beam.pvalue.PCollection`.
+
+  Args:
+    source_handler: Handles source lookup and metadata retrieval.
+      Implements the
+      :class:`apache_beam.transforms.enrichment.EnrichmentSourceHandler`
+    join_fn: A lambda function to join original element with lookup metadata.
+      Defaults to `CROSS_JOIN`.
+    timeout: (Optional) timeout for source requests. Defaults to 30 seconds.
+    should_backoff: (Optional) backoff strategy function.
+    repeater: (Optional) retry Repeater.
+    cache_reader: (Optional) CacheReader for reading cache.
+    cache_writer: (Optional) CacheWriter for writing cache.
+    throttler: (Optional) Throttler mechanism to throttle source requests.
+  """
+  def __init__(
+      self,
+      source_handler: EnrichmentSourceHandler,
+      join_fn: Callable = cross_join,
+      timeout: Optional[float] = DEFAULT_TIMEOUT_SECS,
+      should_backoff: Optional[ShouldBackOff] = None,
+      repeater: Optional[Repeater] = None,
+      cache_reader: Optional[CacheReader] = None,
+      cache_writer: Optional[CacheWriter] = None,
+      throttler: Optional[PreCallThrottler] = None):

Review Comment:
   Lets remove them for now. That way if something comes up (sickness, other 
priorities, etc...) this doesn't get accidentally left in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to