damccorm commented on code in PR #30307:
URL: https://github.com/apache/beam/pull/30307#discussion_r1491752330


##########
sdks/python/apache_beam/io/requestresponse_it_test.py:
##########
@@ -181,9 +176,132 @@ def test_request_response_io(self):
       output = (
           test_pipeline
           | 'Create PCollection' >> beam.Create([req])
-          | 'RRIO Transform' >> RequestResponseIO(client))
+          | 'RRIO Transform' >> RequestResponseIO(client)
+          | 'Validate' >> beam.ParDo(ValidateResponse()))
       self.assertIsNotNone(output)
 
 
+class ValidateCacheResponses(beam.DoFn):
+  """Validates that the responses are fetched from the cache."""
+  def process(self, element, *args, **kwargs):
+    if not element[1] or 'cached-' not in element[1]:
+      raise ValueError(
+          'responses not fetched from cache even though cache '
+          'entries are present.')
+
+
+class ValidateCallerResponses(beam.DoFn):
+  """Validates that the responses are fetched from the caller."""
+  def process(self, element, *args, **kwargs):
+    if not element[1] or 'ACK-' not in element[1]:
+      raise ValueError('responses not fetched from caller when they should.')
+
+
+class FakeCallerForCache(Caller[str, str]):
+  def __init__(self, use_cache: bool = False):
+    self.use_cache = use_cache
+
+  def __enter__(self):
+    pass
+
+  def __call__(self, element, *args, **kwargs):
+    if self.use_cache:
+      return None, None
+
+    return element, 'ACK-{element}'
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    pass
+
+
[email protected]_redis
+class TestRedisCache(unittest.TestCase):
+  def setUp(self) -> None:
+    self.retries = 3
+    self._start_container()
+
+  def test_rrio_cache_all_miss(self):
+    """Cache is empty so all responses are fetched from caller."""
+    caller = FakeCallerForCache()
+    req = ['redis', 'cachetools', 'memcache']
+    cache = RedisCache(
+        self.host,
+        self.port,
+        time_to_live=30,
+        request_coder=coders.StrUtf8Coder(),
+        response_coder=coders.StrUtf8Coder())
+    with TestPipeline(is_integration_test=True) as p:
+      _ = (
+          p
+          | beam.Create(req)
+          | RequestResponseIO(caller, cache=cache)
+          | beam.ParDo(ValidateCallerResponses()))

Review Comment:
   Can we test after the test pipeline that the requests do get written back to 
the cache by hitting redis directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to