tvalentyn commented on code in PR #30307:
URL: https://github.com/apache/beam/pull/30307#discussion_r1491723636


##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -251,75 +285,14 @@ def __init__(
     self.delay_secs = delay_secs
 
 
-class RequestResponseIO(beam.PTransform[beam.PCollection[RequestT],
-                                        beam.PCollection[ResponseT]]):
-  """A :class:`RequestResponseIO` transform to read and write to APIs.
-
-  Processes an input :class:`~apache_beam.pvalue.PCollection` of requests
-  by making a call to the API as defined in :class:`Caller`'s `__call__`
-  and returns a :class:`~apache_beam.pvalue.PCollection` of responses.
-  """
-  def __init__(
-      self,
-      caller: Caller[RequestT, ResponseT],
-      timeout: Optional[float] = DEFAULT_TIMEOUT_SECS,
-      should_backoff: Optional[ShouldBackOff] = None,
-      repeater: Repeater = ExponentialBackOffRepeater(),
-      cache_reader: Optional[CacheReader] = None,
-      cache_writer: Optional[CacheWriter] = None,
-      throttler: PreCallThrottler = DefaultThrottler(),
-  ):
-    """
-    Instantiates a RequestResponseIO transform.
-
-    Args:
-      caller (~apache_beam.io.requestresponse.Caller): an implementation of
-        `Caller` object that makes call to the API.
-      timeout (float): timeout value in seconds to wait for response from API.
-      should_backoff (~apache_beam.io.requestresponse.ShouldBackOff):
-        (Optional) provides methods for backoff.
-      repeater (~apache_beam.io.requestresponse.Repeater): provides method to
-        repeat failed requests to API due to service errors. Defaults to
-        :class:`apache_beam.io.requestresponse.ExponentialBackOffRepeater` to
-        repeat requests with exponential backoff.
-      cache_reader (~apache_beam.io.requestresponse.CacheReader): (Optional)
-        provides methods to read external cache.
-      cache_writer (~apache_beam.io.requestresponse.CacheWriter): (Optional)
-        provides methods to write to external cache.
-      throttler (~apache_beam.io.requestresponse.PreCallThrottler):
-        provides methods to pre-throttle a request. Defaults to
-        :class:`apache_beam.io.requestresponse.DefaultThrottler` for
-        client-side adaptive throttling using
-        :class:`apache_beam.io.components.adaptive_throttler.AdaptiveThrottler`
-    """
-    self._caller = caller
-    self._timeout = timeout
-    self._should_backoff = should_backoff
-    if repeater:
-      self._repeater = repeater
-    else:
-      self._repeater = NoOpsRepeater()
-    self._cache_reader = cache_reader
-    self._cache_writer = cache_writer
-    self._throttler = throttler
-
-  def expand(
-      self,
-      requests: beam.PCollection[RequestT]) -> beam.PCollection[ResponseT]:
-    # TODO(riteshghorse): handle Cache and Throttle PTransforms when available.
-    if isinstance(self._throttler, DefaultThrottler):
-      return requests | _Call(
-          caller=self._caller,
-          timeout=self._timeout,
-          should_backoff=self._should_backoff,
-          repeater=self._repeater,
-          throttler=self._throttler)
+class _FilterCacheReadFn(beam.DoFn):

Review Comment:
   Would `_FilterCacheHitsFn`/`_FilterCacheMissesFn` be appropriate names?



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -37,10 +47,33 @@
 RequestT = TypeVar('RequestT')
 ResponseT = TypeVar('ResponseT')
 
-DEFAULT_TIMEOUT_SECS = 30  # seconds
+# DEFAULT_TIMEOUT_SECS represents the time interval for completing the request
+# with external source.
+DEFAULT_TIMEOUT_SECS = 30
+
+# DEFAULT_TIME_TO_LIVE_SECS represents the total time to live for cache record.
+DEFAULT_TIME_TO_LIVE_SECS = 24 * 60 * 60
 
 _LOGGER = logging.getLogger(__name__)
 
+__all__ = [
+    'RequestResponseIO',
+    'ExponentialBackOffRepeater',
+    'DefaultThrottler',
+    'NoOpsRepeater',
+    'RedisCache',
+    'ReadFromRedis',
+    'WriteToRedis',
+    'RedisCaller',
+    'DEFAULT_TIMEOUT_SECS',

Review Comment:
   This is an internal constant. You can still reference it in other files, for 
example via requestresponse.DEFAULT_TIMEOUT_SECS, and you don't need to add it 
to `__all__` solely for the purpose of using elsewhere. 
   For more information, see: 
https://stackoverflow.com/questions/44834/what-does-all-mean-in-python
   A good example of `__all__` usage in Beam is core transforms:
   
https://github.com/apache/beam/blob/52eedb2a20d83444ddc9187bca416598c168e9fd/sdks/python/apache_beam/transforms/__init__.py#L23
   
https://github.com/apache/beam/blob/52eedb2a20d83444ddc9187bca416598c168e9fd/sdks/python/apache_beam/transforms/core.py#L86
   
   This way, we can refer to `apache_beam.transforms.Flatten` via a "shorthand" 
that skips `.core`, but internal class such that 
apache_beam.transforms.core.DoFnContext is not implicitly imported into 
`transforms` package.



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -411,3 +384,430 @@ def process(self, request: RequestT, *args, **kwargs):
   def teardown(self):
     self._metrics_collector.teardown_counter.inc(1)
     self._caller.__exit__(*sys.exc_info())
+
+
+class Cache(abc.ABC):
+  """Base Cache class for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+
+  For adding cache support to RequestResponseIO, implement this class.
+  """
+  @abc.abstractmethod
+  def get_read(self):
+    """get_read returns a PTransform that reads from the cache."""

Review Comment:
   nit: unlike golang, in python it is not common to include the function name 
in the docstring. 
   `Returns a PTransform that reads...`



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -411,3 +384,430 @@ def process(self, request: RequestT, *args, **kwargs):
   def teardown(self):
     self._metrics_collector.teardown_counter.inc(1)
     self._caller.__exit__(*sys.exc_info())
+
+
+class Cache(abc.ABC):
+  """Base Cache class for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+
+  For adding cache support to RequestResponseIO, implement this class.
+  """
+  @abc.abstractmethod
+  def get_read(self):
+    """get_read returns a PTransform that reads from the cache."""
+    pass
+
+  @abc.abstractmethod
+  def get_write(self):
+    """get_write returns a PTransform that writes to the cache."""
+    pass
+
+  @abc.abstractmethod
+  def has_request_coder(self) -> bool:

Review Comment:
   what happens when we don't have a coder configured?



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -411,3 +384,430 @@ def process(self, request: RequestT, *args, **kwargs):
   def teardown(self):
     self._metrics_collector.teardown_counter.inc(1)
     self._caller.__exit__(*sys.exc_info())
+
+
+class Cache(abc.ABC):
+  """Base Cache class for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+
+  For adding cache support to RequestResponseIO, implement this class.
+  """
+  @abc.abstractmethod
+  def get_read(self):
+    """get_read returns a PTransform that reads from the cache."""
+    pass
+
+  @abc.abstractmethod
+  def get_write(self):
+    """get_write returns a PTransform that writes to the cache."""
+    pass
+
+  @abc.abstractmethod
+  def has_request_coder(self) -> bool:
+    """returns `True` if the request coder is present."""
+    pass
+
+  @abc.abstractmethod
+  def set_request_coder(self, request_coder: coders.Coder):
+    """sets the request coder to use with Cache."""
+    pass
+
+  @abc.abstractmethod
+  def set_response_coder(self, response_coder: coders.Coder):
+    """sets the response coder to use with Cache."""
+    pass
+
+  @abc.abstractmethod
+  def set_source_caller(self, caller: Caller):
+    """This method allows
+    :class:`apache_beam.io.requestresponse.RequestResponseIO` to pull
+    cache requests from respective callers."""
+    pass
+
+
+class _RedisMode(enum.Enum):
+  """
+  Mode of operation for redis cache when using
+  :class:`apache_beam.io.requestresponse.RedisCaller`.
+  """
+  READ = 0
+  WRITE = 1
+
+
+class RedisCaller(Caller):
+  """`RedisCaller` is an implementation of
+  :class:`apache_beam.io.requestresponse.Caller` for Redis client.
+
+  It provides the functionality for making requests to Redis server using
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+  """
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      kwargs: Optional[Dict[str, Any]] = None,
+      source_caller: Optional[Caller] = None,
+      mode: _RedisMode,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+      mode: `_RedisMode` An enum type specifying the operational mode of
+        the `RedisCaller`.
+    """
+    self.host, self.port = host, port
+    self.time_to_live = time_to_live
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.kwargs = kwargs
+    self.source_caller = source_caller
+    self.mode = mode
+
+  def __enter__(self):
+    self.client = redis.Redis(self.host, self.port, **self.kwargs)
+
+  def __call__(self, element, *args, **kwargs):
+    if self.mode == _RedisMode.READ:
+      cache_request = self.source_caller.get_cache_request_key(element)
+      # check if the caller is a enrichment handler. EnrichmentHandler
+      # provides the request format for cache.
+      if cache_request:
+        encoded_request = self.request_coder.encode(cache_request)
+      else:
+        encoded_request = self.request_coder.encode(element)
+
+      encoded_response = self.client.get(encoded_request)
+      if not encoded_response:
+        # no cache entry present for this request.
+        return element, None
+
+      if self.response_coder is None:
+        try:
+          response_dict = json.loads(encoded_response.decode('utf-8'))
+          response = beam.Row(**response_dict)
+        except Exception:
+          _LOGGER.warning(
+              'cannot decode response from redis cache for %s.' % element)
+          return element, None
+      else:
+        response = self.response_coder.decode(encoded_response)
+      return element, response
+    else:
+      cache_request = self.source_caller.get_cache_request_key(element[0])
+      if cache_request:
+        encoded_request = self.request_coder.encode(cache_request)
+      else:
+        encoded_request = self.request_coder.encode(element[0])
+      if self.response_coder is None:
+        try:
+          encoded_response = json.dumps(element[1]._asdict()).encode('utf-8')
+        except Exception:
+          _LOGGER.warning(
+              'cannot encode response %s for %s to store in '
+              'redis cache.' % (element[1], element[0]))
+          return element
+      else:
+        encoded_response = self.response_coder.encode(element[1])
+      # Write to cache with TTL. Set nx to True to prevent overwriting for the
+      # same key.
+      self.client.set(
+          encoded_request, encoded_response, self.time_to_live, nx=True)
+      return element
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self.client.close()
+
+
+class ReadFromRedis(beam.PTransform[beam.PCollection[RequestT],
+                                    beam.PCollection[ResponseT]]):
+  """ReadFromRedis is a `PTransform` that performs Redis cache read."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      source_caller: Optional[Caller[RequestT, ResponseT]] = None,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+    """
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.redis_caller = RedisCaller(
+        host,
+        port,
+        time_to_live,
+        request_coder=self.request_coder,
+        response_coder=self.response_coder,
+        kwargs=kwargs,
+        source_caller=source_caller,
+        mode=_RedisMode.READ)
+
+  def expand(
+      self,
+      requests: beam.PCollection[RequestT]) -> beam.PCollection[ResponseT]:
+    return requests | RequestResponseIO(self.redis_caller)
+
+
+class WriteToRedis(beam.PTransform[beam.PCollection[Tuple[RequestT, 
ResponseT]],
+                                   beam.PCollection[ResponseT]]):
+  """WriteToRedis is a `PTransfrom` that performs write to Redis cache."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      source_caller: Optional[Caller[RequestT, ResponseT]] = None,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+      """
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.redis_caller = RedisCaller(
+        host,
+        port,
+        time_to_live,
+        request_coder=self.request_coder,
+        response_coder=self.response_coder,
+        kwargs=kwargs,
+        source_caller=source_caller,
+        mode=_RedisMode.WRITE)
+
+  def expand(
+      self, elements: beam.PCollection[Tuple[RequestT, ResponseT]]
+  ) -> beam.PCollection[ResponseT]:
+    return elements | RequestResponseIO(self.redis_caller)
+
+
+def ensure_coders_exist(request_coder):
+  """checks if the coder exists to encode the request for caching."""
+  if not request_coder:
+    _LOGGER.warning(
+        'need request coder to be able to use'
+        'Cache with RequestResponseIO.')
+
+
+class RedisCache(Cache):
+  """RedisCache to configure cache using Redis for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta] = DEFAULT_TIME_TO_LIVE_SECS,
+      request_coder: Optional[coders.Coder] = None,
+      response_coder: Optional[coders.Coder] = None,
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,

Review Comment:
   Have we considered passing Redis params via `**kwargs` in the user-facing 
portion instead of requesting user to pass a `kwargs` param?
   cc: @damccorm 



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -411,3 +384,430 @@ def process(self, request: RequestT, *args, **kwargs):
   def teardown(self):
     self._metrics_collector.teardown_counter.inc(1)
     self._caller.__exit__(*sys.exc_info())
+
+
+class Cache(abc.ABC):
+  """Base Cache class for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+
+  For adding cache support to RequestResponseIO, implement this class.
+  """
+  @abc.abstractmethod
+  def get_read(self):
+    """get_read returns a PTransform that reads from the cache."""
+    pass
+
+  @abc.abstractmethod
+  def get_write(self):
+    """get_write returns a PTransform that writes to the cache."""
+    pass
+
+  @abc.abstractmethod
+  def has_request_coder(self) -> bool:
+    """returns `True` if the request coder is present."""
+    pass
+
+  @abc.abstractmethod
+  def set_request_coder(self, request_coder: coders.Coder):
+    """sets the request coder to use with Cache."""
+    pass
+
+  @abc.abstractmethod
+  def set_response_coder(self, response_coder: coders.Coder):
+    """sets the response coder to use with Cache."""
+    pass
+
+  @abc.abstractmethod
+  def set_source_caller(self, caller: Caller):
+    """This method allows
+    :class:`apache_beam.io.requestresponse.RequestResponseIO` to pull
+    cache requests from respective callers."""
+    pass
+
+
+class _RedisMode(enum.Enum):
+  """
+  Mode of operation for redis cache when using
+  :class:`apache_beam.io.requestresponse.RedisCaller`.
+  """
+  READ = 0
+  WRITE = 1
+
+
+class RedisCaller(Caller):
+  """`RedisCaller` is an implementation of
+  :class:`apache_beam.io.requestresponse.Caller` for Redis client.
+
+  It provides the functionality for making requests to Redis server using
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+  """
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      kwargs: Optional[Dict[str, Any]] = None,
+      source_caller: Optional[Caller] = None,
+      mode: _RedisMode,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+      mode: `_RedisMode` An enum type specifying the operational mode of
+        the `RedisCaller`.
+    """
+    self.host, self.port = host, port
+    self.time_to_live = time_to_live
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.kwargs = kwargs
+    self.source_caller = source_caller
+    self.mode = mode
+
+  def __enter__(self):
+    self.client = redis.Redis(self.host, self.port, **self.kwargs)
+
+  def __call__(self, element, *args, **kwargs):
+    if self.mode == _RedisMode.READ:
+      cache_request = self.source_caller.get_cache_request_key(element)
+      # check if the caller is a enrichment handler. EnrichmentHandler
+      # provides the request format for cache.
+      if cache_request:
+        encoded_request = self.request_coder.encode(cache_request)
+      else:
+        encoded_request = self.request_coder.encode(element)
+
+      encoded_response = self.client.get(encoded_request)
+      if not encoded_response:
+        # no cache entry present for this request.
+        return element, None
+
+      if self.response_coder is None:
+        try:
+          response_dict = json.loads(encoded_response.decode('utf-8'))
+          response = beam.Row(**response_dict)
+        except Exception:
+          _LOGGER.warning(
+              'cannot decode response from redis cache for %s.' % element)
+          return element, None
+      else:
+        response = self.response_coder.decode(encoded_response)
+      return element, response
+    else:
+      cache_request = self.source_caller.get_cache_request_key(element[0])
+      if cache_request:
+        encoded_request = self.request_coder.encode(cache_request)
+      else:
+        encoded_request = self.request_coder.encode(element[0])
+      if self.response_coder is None:
+        try:
+          encoded_response = json.dumps(element[1]._asdict()).encode('utf-8')
+        except Exception:
+          _LOGGER.warning(
+              'cannot encode response %s for %s to store in '
+              'redis cache.' % (element[1], element[0]))
+          return element
+      else:
+        encoded_response = self.response_coder.encode(element[1])
+      # Write to cache with TTL. Set nx to True to prevent overwriting for the
+      # same key.
+      self.client.set(
+          encoded_request, encoded_response, self.time_to_live, nx=True)
+      return element
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self.client.close()
+
+
+class ReadFromRedis(beam.PTransform[beam.PCollection[RequestT],
+                                    beam.PCollection[ResponseT]]):
+  """ReadFromRedis is a `PTransform` that performs Redis cache read."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      source_caller: Optional[Caller[RequestT, ResponseT]] = None,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+    """
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.redis_caller = RedisCaller(
+        host,
+        port,
+        time_to_live,
+        request_coder=self.request_coder,
+        response_coder=self.response_coder,
+        kwargs=kwargs,
+        source_caller=source_caller,
+        mode=_RedisMode.READ)
+
+  def expand(
+      self,
+      requests: beam.PCollection[RequestT]) -> beam.PCollection[ResponseT]:
+    return requests | RequestResponseIO(self.redis_caller)
+
+
+class WriteToRedis(beam.PTransform[beam.PCollection[Tuple[RequestT, 
ResponseT]],
+                                   beam.PCollection[ResponseT]]):
+  """WriteToRedis is a `PTransfrom` that performs write to Redis cache."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      source_caller: Optional[Caller[RequestT, ResponseT]] = None,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+      """
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.redis_caller = RedisCaller(
+        host,
+        port,
+        time_to_live,
+        request_coder=self.request_coder,
+        response_coder=self.response_coder,
+        kwargs=kwargs,
+        source_caller=source_caller,
+        mode=_RedisMode.WRITE)
+
+  def expand(
+      self, elements: beam.PCollection[Tuple[RequestT, ResponseT]]
+  ) -> beam.PCollection[ResponseT]:
+    return elements | RequestResponseIO(self.redis_caller)
+
+
+def ensure_coders_exist(request_coder):
+  """checks if the coder exists to encode the request for caching."""
+  if not request_coder:
+    _LOGGER.warning(
+        'need request coder to be able to use'
+        'Cache with RequestResponseIO.')
+
+
+class RedisCache(Cache):
+  """RedisCache to configure cache using Redis for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta] = DEFAULT_TIME_TO_LIVE_SECS,
+      request_coder: Optional[coders.Coder] = None,
+      response_coder: Optional[coders.Coder] = None,
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,

Review Comment:
   same question applies to  `def with_redis_cache`. if keeping as is, maybe 
call it `redis_args` or smth  descriptive. 



##########
.github/trigger_files/beam_PostCommit_Python.json:
##########
@@ -0,0 +1 @@
+test

Review Comment:
   Is this to trigger a test suite? there should be a trigger command that you 
can use to trigger postcommits on a PR .



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -107,6 +140,17 @@ def __enter__(self):
   def __exit__(self, exc_type, exc_val, exc_tb):
     return None
 
+  def get_cache_request_key(self, request: RequestT):

Review Comment:
   There is some confusion as to what this function is supposed to return as i 
read:
   
   > get_cache_request_key(
   ...
   > Returns the request to be cached
   ...
   > Implement this method to override the key for the cache.
   
   Would `get_cache_key` be a better a name? Could we also add a typehint for 
return result?
   



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -37,10 +47,33 @@
 RequestT = TypeVar('RequestT')
 ResponseT = TypeVar('ResponseT')
 
-DEFAULT_TIMEOUT_SECS = 30  # seconds
+# DEFAULT_TIMEOUT_SECS represents the time interval for completing the request
+# with external source.
+DEFAULT_TIMEOUT_SECS = 30

Review Comment:
   Consider 
   ```
   DEFAULT_REQUEST_TIMEOUT_SEC
   DEFAULT_CACHE_ENTRY_TTL_SEC
   ```



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -251,75 +285,14 @@ def __init__(
     self.delay_secs = delay_secs
 
 
-class RequestResponseIO(beam.PTransform[beam.PCollection[RequestT],
-                                        beam.PCollection[ResponseT]]):
-  """A :class:`RequestResponseIO` transform to read and write to APIs.
-
-  Processes an input :class:`~apache_beam.pvalue.PCollection` of requests
-  by making a call to the API as defined in :class:`Caller`'s `__call__`
-  and returns a :class:`~apache_beam.pvalue.PCollection` of responses.
-  """
-  def __init__(
-      self,
-      caller: Caller[RequestT, ResponseT],
-      timeout: Optional[float] = DEFAULT_TIMEOUT_SECS,
-      should_backoff: Optional[ShouldBackOff] = None,
-      repeater: Repeater = ExponentialBackOffRepeater(),
-      cache_reader: Optional[CacheReader] = None,
-      cache_writer: Optional[CacheWriter] = None,
-      throttler: PreCallThrottler = DefaultThrottler(),
-  ):
-    """
-    Instantiates a RequestResponseIO transform.
-
-    Args:
-      caller (~apache_beam.io.requestresponse.Caller): an implementation of
-        `Caller` object that makes call to the API.
-      timeout (float): timeout value in seconds to wait for response from API.
-      should_backoff (~apache_beam.io.requestresponse.ShouldBackOff):
-        (Optional) provides methods for backoff.
-      repeater (~apache_beam.io.requestresponse.Repeater): provides method to
-        repeat failed requests to API due to service errors. Defaults to
-        :class:`apache_beam.io.requestresponse.ExponentialBackOffRepeater` to
-        repeat requests with exponential backoff.
-      cache_reader (~apache_beam.io.requestresponse.CacheReader): (Optional)
-        provides methods to read external cache.
-      cache_writer (~apache_beam.io.requestresponse.CacheWriter): (Optional)
-        provides methods to write to external cache.
-      throttler (~apache_beam.io.requestresponse.PreCallThrottler):
-        provides methods to pre-throttle a request. Defaults to
-        :class:`apache_beam.io.requestresponse.DefaultThrottler` for
-        client-side adaptive throttling using
-        :class:`apache_beam.io.components.adaptive_throttler.AdaptiveThrottler`
-    """
-    self._caller = caller
-    self._timeout = timeout
-    self._should_backoff = should_backoff
-    if repeater:
-      self._repeater = repeater
-    else:
-      self._repeater = NoOpsRepeater()
-    self._cache_reader = cache_reader
-    self._cache_writer = cache_writer
-    self._throttler = throttler
-
-  def expand(
-      self,
-      requests: beam.PCollection[RequestT]) -> beam.PCollection[ResponseT]:
-    # TODO(riteshghorse): handle Cache and Throttle PTransforms when available.
-    if isinstance(self._throttler, DefaultThrottler):
-      return requests | _Call(
-          caller=self._caller,
-          timeout=self._timeout,
-          should_backoff=self._should_backoff,
-          repeater=self._repeater,
-          throttler=self._throttler)
+class _FilterCacheReadFn(beam.DoFn):
+  """A `DoFn` that emits to main output for successful cache read requests or
+  to the tagged output - `inputs` - otherwise."""
+  def process(self, element: Tuple[RequestT, ResponseT], *args, **kwargs):
+    if not element[1]:
+      yield pvalue.TaggedOutput('inputs', element[0])

Review Comment:
   should the tagged output be names add as `cache_misses` or something like 
that?



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -411,3 +384,430 @@ def process(self, request: RequestT, *args, **kwargs):
   def teardown(self):
     self._metrics_collector.teardown_counter.inc(1)
     self._caller.__exit__(*sys.exc_info())
+
+
+class Cache(abc.ABC):
+  """Base Cache class for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+
+  For adding cache support to RequestResponseIO, implement this class.
+  """
+  @abc.abstractmethod
+  def get_read(self):
+    """get_read returns a PTransform that reads from the cache."""
+    pass
+
+  @abc.abstractmethod
+  def get_write(self):
+    """get_write returns a PTransform that writes to the cache."""
+    pass
+
+  @abc.abstractmethod
+  def has_request_coder(self) -> bool:
+    """returns `True` if the request coder is present."""
+    pass
+
+  @abc.abstractmethod
+  def set_request_coder(self, request_coder: coders.Coder):

Review Comment:
   It is not common to use java-style getter/setters , see: 
   
https://stackoverflow.com/questions/2627002/whats-the-pythonic-way-to-use-getters-and-setters
   
https://stackoverflow.com/questions/5960337/how-to-create-abstract-properties-in-python-abstract-classes



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -107,6 +140,17 @@ def __enter__(self):
   def __exit__(self, exc_type, exc_val, exc_tb):
     return None
 
+  def get_cache_request_key(self, request: RequestT):
+    """Returns the request to be cached. This is how the
+    response will be looked up in the cache as well.
+
+    By default, entire request is cached as the key for the cache.
+    Implement this method to override the key for the cache.
+    For example, in `BigTableEnrichmentHandler`, the row key for the element

Review Comment:
   nit: I believe Bigtable is preferred spelling. 



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -107,6 +140,17 @@ def __enter__(self):
   def __exit__(self, exc_type, exc_val, exc_tb):
     return None
 
+  def get_cache_request_key(self, request: RequestT):
+    """Returns the request to be cached. This is how the

Review Comment:
   A preferred syntax for multiline docstrings is to have a 1-liner summary + 
blank line + explanation, see: 
https://peps.python.org/pep-0257/#multi-line-docstrings 



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -411,3 +384,430 @@ def process(self, request: RequestT, *args, **kwargs):
   def teardown(self):
     self._metrics_collector.teardown_counter.inc(1)
     self._caller.__exit__(*sys.exc_info())
+
+
+class Cache(abc.ABC):
+  """Base Cache class for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+
+  For adding cache support to RequestResponseIO, implement this class.
+  """
+  @abc.abstractmethod
+  def get_read(self):
+    """get_read returns a PTransform that reads from the cache."""
+    pass
+
+  @abc.abstractmethod
+  def get_write(self):
+    """get_write returns a PTransform that writes to the cache."""
+    pass
+
+  @abc.abstractmethod
+  def has_request_coder(self) -> bool:
+    """returns `True` if the request coder is present."""
+    pass
+
+  @abc.abstractmethod
+  def set_request_coder(self, request_coder: coders.Coder):
+    """sets the request coder to use with Cache."""
+    pass
+
+  @abc.abstractmethod
+  def set_response_coder(self, response_coder: coders.Coder):
+    """sets the response coder to use with Cache."""
+    pass
+
+  @abc.abstractmethod
+  def set_source_caller(self, caller: Caller):
+    """This method allows
+    :class:`apache_beam.io.requestresponse.RequestResponseIO` to pull
+    cache requests from respective callers."""
+    pass
+
+
+class _RedisMode(enum.Enum):
+  """
+  Mode of operation for redis cache when using
+  :class:`apache_beam.io.requestresponse.RedisCaller`.
+  """
+  READ = 0
+  WRITE = 1
+
+
+class RedisCaller(Caller):
+  """`RedisCaller` is an implementation of
+  :class:`apache_beam.io.requestresponse.Caller` for Redis client.
+
+  It provides the functionality for making requests to Redis server using
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+  """
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      kwargs: Optional[Dict[str, Any]] = None,
+      source_caller: Optional[Caller] = None,
+      mode: _RedisMode,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+      mode: `_RedisMode` An enum type specifying the operational mode of
+        the `RedisCaller`.
+    """
+    self.host, self.port = host, port
+    self.time_to_live = time_to_live
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.kwargs = kwargs
+    self.source_caller = source_caller
+    self.mode = mode
+
+  def __enter__(self):
+    self.client = redis.Redis(self.host, self.port, **self.kwargs)
+
+  def __call__(self, element, *args, **kwargs):
+    if self.mode == _RedisMode.READ:
+      cache_request = self.source_caller.get_cache_request_key(element)
+      # check if the caller is a enrichment handler. EnrichmentHandler
+      # provides the request format for cache.
+      if cache_request:
+        encoded_request = self.request_coder.encode(cache_request)
+      else:
+        encoded_request = self.request_coder.encode(element)
+
+      encoded_response = self.client.get(encoded_request)
+      if not encoded_response:
+        # no cache entry present for this request.
+        return element, None
+
+      if self.response_coder is None:
+        try:
+          response_dict = json.loads(encoded_response.decode('utf-8'))
+          response = beam.Row(**response_dict)
+        except Exception:
+          _LOGGER.warning(
+              'cannot decode response from redis cache for %s.' % element)
+          return element, None
+      else:
+        response = self.response_coder.decode(encoded_response)
+      return element, response
+    else:
+      cache_request = self.source_caller.get_cache_request_key(element[0])
+      if cache_request:
+        encoded_request = self.request_coder.encode(cache_request)
+      else:
+        encoded_request = self.request_coder.encode(element[0])
+      if self.response_coder is None:
+        try:
+          encoded_response = json.dumps(element[1]._asdict()).encode('utf-8')
+        except Exception:
+          _LOGGER.warning(
+              'cannot encode response %s for %s to store in '
+              'redis cache.' % (element[1], element[0]))
+          return element
+      else:
+        encoded_response = self.response_coder.encode(element[1])
+      # Write to cache with TTL. Set nx to True to prevent overwriting for the
+      # same key.
+      self.client.set(
+          encoded_request, encoded_response, self.time_to_live, nx=True)
+      return element
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self.client.close()
+
+
+class ReadFromRedis(beam.PTransform[beam.PCollection[RequestT],
+                                    beam.PCollection[ResponseT]]):
+  """ReadFromRedis is a `PTransform` that performs Redis cache read."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      source_caller: Optional[Caller[RequestT, ResponseT]] = None,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+    """
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.redis_caller = RedisCaller(
+        host,
+        port,
+        time_to_live,
+        request_coder=self.request_coder,
+        response_coder=self.response_coder,
+        kwargs=kwargs,
+        source_caller=source_caller,
+        mode=_RedisMode.READ)
+
+  def expand(
+      self,
+      requests: beam.PCollection[RequestT]) -> beam.PCollection[ResponseT]:
+    return requests | RequestResponseIO(self.redis_caller)
+
+
+class WriteToRedis(beam.PTransform[beam.PCollection[Tuple[RequestT, 
ResponseT]],
+                                   beam.PCollection[ResponseT]]):
+  """WriteToRedis is a `PTransfrom` that performs write to Redis cache."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      source_caller: Optional[Caller[RequestT, ResponseT]] = None,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+      """
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.redis_caller = RedisCaller(
+        host,
+        port,
+        time_to_live,
+        request_coder=self.request_coder,
+        response_coder=self.response_coder,
+        kwargs=kwargs,
+        source_caller=source_caller,
+        mode=_RedisMode.WRITE)
+
+  def expand(
+      self, elements: beam.PCollection[Tuple[RequestT, ResponseT]]
+  ) -> beam.PCollection[ResponseT]:
+    return elements | RequestResponseIO(self.redis_caller)
+
+
+def ensure_coders_exist(request_coder):
+  """checks if the coder exists to encode the request for caching."""
+  if not request_coder:
+    _LOGGER.warning(
+        'need request coder to be able to use'
+        'Cache with RequestResponseIO.')
+
+
+class RedisCache(Cache):
+  """RedisCache to configure cache using Redis for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta] = DEFAULT_TIME_TO_LIVE_SECS,
+      request_coder: Optional[coders.Coder] = None,
+      response_coder: Optional[coders.Coder] = None,
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      request_coder: (Optional[`coders.Coder`]) coder for encoding requests.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+    """
+    self._host = host
+    self._port = port
+    self._time_to_live = time_to_live
+    self._request_coder = request_coder
+    self._response_coder = response_coder
+    self._kwargs = kwargs if kwargs else {}
+    self._source_caller = None
+
+  def get_read(self):
+    """get_read returns a PTransform for reading from the cache."""
+    ensure_coders_exist(self._request_coder)
+    return ReadFromRedis(
+        self._host,
+        self._port,
+        time_to_live=self._time_to_live,
+        kwargs=self._kwargs,
+        request_coder=self._request_coder,
+        response_coder=self._response_coder,
+        source_caller=self._source_caller)
+
+  def get_write(self):
+    """get_write returns a PTransform for writing to the cache."""
+    ensure_coders_exist(self._request_coder)
+    return WriteToRedis(
+        self._host,
+        self._port,
+        time_to_live=self._time_to_live,
+        kwargs=self._kwargs,
+        request_coder=self._request_coder,
+        response_coder=self._response_coder,
+        source_caller=self._source_caller)
+
+  def has_request_coder(self) -> bool:
+    """returns True if the request coder exists."""
+    return self._request_coder is not None
+
+  def set_request_coder(self, request_coder: coders.Coder):
+    """sets the request coder to encode request for `RedisCache`."""
+    if request_coder and not self._request_coder:
+      self._request_coder = request_coder
+
+  def set_response_coder(self, response_coder: coders.Coder):
+    """sets the response coder to encode/decode response for `RedisCache`."""
+    if response_coder and not self._response_coder:

Review Comment:
   what is the rationale for the `not self._response_coder` part? 



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -37,10 +47,33 @@
 RequestT = TypeVar('RequestT')
 ResponseT = TypeVar('ResponseT')
 
-DEFAULT_TIMEOUT_SECS = 30  # seconds
+# DEFAULT_TIMEOUT_SECS represents the time interval for completing the request
+# with external source.
+DEFAULT_TIMEOUT_SECS = 30
+
+# DEFAULT_TIME_TO_LIVE_SECS represents the total time to live for cache record.
+DEFAULT_TIME_TO_LIVE_SECS = 24 * 60 * 60
 
 _LOGGER = logging.getLogger(__name__)
 
+__all__ = [
+    'RequestResponseIO',
+    'ExponentialBackOffRepeater',
+    'DefaultThrottler',
+    'NoOpsRepeater',
+    'RedisCache',
+    'ReadFromRedis',

Review Comment:
   What is user-facing API and what are internal classes here?
   
   Looks like RedisCache is user facing, so it should be here. What about other 
classes?
   
   If the transform `ReadFromReddis` is internal, it should be removed and 
named as `_ReadFromRedis`
   



##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -411,3 +384,430 @@ def process(self, request: RequestT, *args, **kwargs):
   def teardown(self):
     self._metrics_collector.teardown_counter.inc(1)
     self._caller.__exit__(*sys.exc_info())
+
+
+class Cache(abc.ABC):
+  """Base Cache class for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+
+  For adding cache support to RequestResponseIO, implement this class.
+  """
+  @abc.abstractmethod
+  def get_read(self):
+    """get_read returns a PTransform that reads from the cache."""
+    pass
+
+  @abc.abstractmethod
+  def get_write(self):
+    """get_write returns a PTransform that writes to the cache."""
+    pass
+
+  @abc.abstractmethod
+  def has_request_coder(self) -> bool:
+    """returns `True` if the request coder is present."""
+    pass
+
+  @abc.abstractmethod
+  def set_request_coder(self, request_coder: coders.Coder):
+    """sets the request coder to use with Cache."""
+    pass
+
+  @abc.abstractmethod
+  def set_response_coder(self, response_coder: coders.Coder):
+    """sets the response coder to use with Cache."""
+    pass
+
+  @abc.abstractmethod
+  def set_source_caller(self, caller: Caller):
+    """This method allows
+    :class:`apache_beam.io.requestresponse.RequestResponseIO` to pull
+    cache requests from respective callers."""
+    pass
+
+
+class _RedisMode(enum.Enum):
+  """
+  Mode of operation for redis cache when using
+  :class:`apache_beam.io.requestresponse.RedisCaller`.
+  """
+  READ = 0
+  WRITE = 1
+
+
+class RedisCaller(Caller):
+  """`RedisCaller` is an implementation of
+  :class:`apache_beam.io.requestresponse.Caller` for Redis client.
+
+  It provides the functionality for making requests to Redis server using
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+  """
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      kwargs: Optional[Dict[str, Any]] = None,
+      source_caller: Optional[Caller] = None,
+      mode: _RedisMode,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+      mode: `_RedisMode` An enum type specifying the operational mode of
+        the `RedisCaller`.
+    """
+    self.host, self.port = host, port
+    self.time_to_live = time_to_live
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.kwargs = kwargs
+    self.source_caller = source_caller
+    self.mode = mode
+
+  def __enter__(self):
+    self.client = redis.Redis(self.host, self.port, **self.kwargs)
+
+  def __call__(self, element, *args, **kwargs):
+    if self.mode == _RedisMode.READ:
+      cache_request = self.source_caller.get_cache_request_key(element)
+      # check if the caller is a enrichment handler. EnrichmentHandler
+      # provides the request format for cache.
+      if cache_request:
+        encoded_request = self.request_coder.encode(cache_request)
+      else:
+        encoded_request = self.request_coder.encode(element)
+
+      encoded_response = self.client.get(encoded_request)
+      if not encoded_response:
+        # no cache entry present for this request.
+        return element, None
+
+      if self.response_coder is None:
+        try:
+          response_dict = json.loads(encoded_response.decode('utf-8'))
+          response = beam.Row(**response_dict)
+        except Exception:
+          _LOGGER.warning(
+              'cannot decode response from redis cache for %s.' % element)
+          return element, None
+      else:
+        response = self.response_coder.decode(encoded_response)
+      return element, response
+    else:
+      cache_request = self.source_caller.get_cache_request_key(element[0])
+      if cache_request:
+        encoded_request = self.request_coder.encode(cache_request)
+      else:
+        encoded_request = self.request_coder.encode(element[0])
+      if self.response_coder is None:
+        try:
+          encoded_response = json.dumps(element[1]._asdict()).encode('utf-8')
+        except Exception:
+          _LOGGER.warning(
+              'cannot encode response %s for %s to store in '
+              'redis cache.' % (element[1], element[0]))
+          return element
+      else:
+        encoded_response = self.response_coder.encode(element[1])
+      # Write to cache with TTL. Set nx to True to prevent overwriting for the
+      # same key.
+      self.client.set(
+          encoded_request, encoded_response, self.time_to_live, nx=True)
+      return element
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self.client.close()
+
+
+class ReadFromRedis(beam.PTransform[beam.PCollection[RequestT],
+                                    beam.PCollection[ResponseT]]):
+  """ReadFromRedis is a `PTransform` that performs Redis cache read."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      source_caller: Optional[Caller[RequestT, ResponseT]] = None,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+    """
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.redis_caller = RedisCaller(
+        host,
+        port,
+        time_to_live,
+        request_coder=self.request_coder,
+        response_coder=self.response_coder,
+        kwargs=kwargs,
+        source_caller=source_caller,
+        mode=_RedisMode.READ)
+
+  def expand(
+      self,
+      requests: beam.PCollection[RequestT]) -> beam.PCollection[ResponseT]:
+    return requests | RequestResponseIO(self.redis_caller)
+
+
+class WriteToRedis(beam.PTransform[beam.PCollection[Tuple[RequestT, 
ResponseT]],
+                                   beam.PCollection[ResponseT]]):
+  """WriteToRedis is a `PTransfrom` that performs write to Redis cache."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta],
+      *,
+      kwargs: Optional[Dict[str, Any]] = None,
+      request_coder: Optional[coders.Coder],
+      response_coder: Optional[coders.Coder],
+      source_caller: Optional[Caller[RequestT, ResponseT]] = None,
+  ):
+    """
+    Args:
+      host (str): The hostname or IP address of the Redis server.
+      port (int): The port number of the Redis server.
+      time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+        records stored in Redis. Provide an integer (in seconds) or a
+        `datetime.timedelta` object.
+      kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+        are required to connect to your redis server. Same as `redis.Redis()`.
+      request_coder: (Optional[`coders.Coder`]) coder for requests stored
+        in Redis.
+      response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+        received from Redis.
+      source_caller: (Optional[`Caller`]): The source caller using this Redis
+        cache in case of fetching the cache request to store in Redis.
+      """
+    self.request_coder = request_coder
+    self.response_coder = response_coder
+    self.redis_caller = RedisCaller(
+        host,
+        port,
+        time_to_live,
+        request_coder=self.request_coder,
+        response_coder=self.response_coder,
+        kwargs=kwargs,
+        source_caller=source_caller,
+        mode=_RedisMode.WRITE)
+
+  def expand(
+      self, elements: beam.PCollection[Tuple[RequestT, ResponseT]]
+  ) -> beam.PCollection[ResponseT]:
+    return elements | RequestResponseIO(self.redis_caller)
+
+
+def ensure_coders_exist(request_coder):
+  """checks if the coder exists to encode the request for caching."""
+  if not request_coder:
+    _LOGGER.warning(
+        'need request coder to be able to use'
+        'Cache with RequestResponseIO.')
+
+
+class RedisCache(Cache):
+  """RedisCache to configure cache using Redis for
+  :class:`apache_beam.io.requestresponse.RequestResponseIO`."""
+  def __init__(
+      self,
+      host: str,
+      port: int,
+      time_to_live: Union[int, timedelta] = DEFAULT_TIME_TO_LIVE_SECS,
+      request_coder: Optional[coders.Coder] = None,
+      response_coder: Optional[coders.Coder] = None,
+      *,

Review Comment:
   should we move the kv-only asterisk after TTL for consistency as elsewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to