riteshghorse commented on code in PR #30307:
URL: https://github.com/apache/beam/pull/30307#discussion_r1491310060
##########
sdks/python/apache_beam/io/requestresponse.py:
##########
@@ -411,3 +387,440 @@ def process(self, request: RequestT, *args, **kwargs):
def teardown(self):
self._metrics_collector.teardown_counter.inc(1)
self._caller.__exit__(*sys.exc_info())
+
+
+class Cache(abc.ABC):
+ """Base Cache class for
+ :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+
+ For adding cache support to RequestResponseIO, implement this class.
+ """
+ @abc.abstractmethod
+ def get_read(self):
+ """get_read returns a PTransform that reads from the cache."""
+ pass
+
+ @abc.abstractmethod
+ def get_write(self):
+ """get_write returns a PTransform that writes to the cache."""
+ pass
+
+ @abc.abstractmethod
+ def has_request_coder(self) -> bool:
+ """returns `True` if the request coder is present."""
+ pass
+
+ @abc.abstractmethod
+ def set_request_coder(self, request_coder: coders.Coder):
+ """sets the request coder to use with Cache."""
+ pass
+
+ @abc.abstractmethod
+ def set_response_coder(self, response_coder: coders.Coder):
+ """sets the response coder to use with Cache."""
+ pass
+
+ @abc.abstractmethod
+ def set_source_caller(self, caller: Caller):
+ """(Internal-only) This method allows
+ :class:`apache_beam.io.requestresponse.RequestResponseIO` to pull
+ cache requests from respective callers."""
+ pass
+
+
+class _RedisMode(enum.Enum):
+ """
+ Mode of operation for redis cache when using
+ :class:`apache_beam.io.requestresponse.RedisCaller`.
+ """
+ READ = 0
+ WRITE = 1
+
+
+class RedisCaller(Caller):
+ """`RedisCaller` is an implementation of
+ :class:`apache_beam.io.requestresponse.Caller` for Redis client.
+
+ It provides the functionality for making requests to Redis server using
+ :class:`apache_beam.io.requestresponse.RequestResponseIO`.
+ """
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ time_to_live: Union[int, timedelta],
+ *,
+ request_coder: Optional[coders.Coder],
+ response_coder: Optional[coders.Coder],
+ kwargs: Optional[Dict[str, Any]] = None,
+ source_caller: Optional[Caller] = None,
+ mode: _RedisMode,
+ ):
+ """
+ Args:
+ host (str): The hostname or IP address of the Redis server.
+ port (int): The port number of the Redis server.
+ time_to_live: `(Union[int, timedelta])` The time-to-live (TTL) for
+ records stored in Redis. Provide an integer (in seconds) or a
+ `datetime.timedelta` object.
+ request_coder: (Optional[`coders.Coder`]) coder for requests stored
+ in Redis.
+ response_coder: (Optional[`coders.Coder`]) coder for decoding responses
+ received from Redis.
+ kwargs: Optional(Dict[str, Any]) additional keyword arguments that
+ are required to connect to your redis server. Same as `redis.Redis()`.
+ source_caller: (Optional[`Caller`]): The source caller using this Redis
+ cache in case of fetching the cache request to store in Redis.
+ mode: `_RedisMode` An enum type specifying the operational mode of
+ the `RedisCaller`.
+ """
+ self.host, self.port = host, port
+ self.time_to_live = time_to_live
+ self.request_coder = request_coder
+ self.response_coder = response_coder
+ self.kwargs = kwargs
+ self.source_caller = source_caller
+ self.mode = mode
+
+ def __enter__(self):
+ self.client = redis.Redis(self.host, self.port, **self.kwargs)
+
+ def __call__(self, element, *args, **kwargs):
+ if self.mode == _RedisMode.READ:
+ cache_request = self.source_caller.get_cache_request(element)
+ # check if the caller is a enrichment handler. EnrichmentHandler
+ # provides the request format for cache.
+ if cache_request:
+ encoded_request = self.request_coder.encode(cache_request)
+ else:
+ encoded_request = self.request_coder.encode(element)
Review Comment:
The write part is similar but it has tuples as input
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]