[
https://issues.apache.org/jira/browse/BEAM-12791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416670#comment-17416670
]
Willi Schinmeyer commented on BEAM-12791:
-----------------------------------------
I've looked into it, and currently {{ReadFromDatastore}} works like this:
{code:python}
return (
pcoll.pipeline
| 'UserQuery' >> Create([self._query])
| 'SplitQuery' >> ParDo(
ReadFromDatastore._SplitQueryFn(self._num_splits))
| Reshuffle()
| 'Read' >> ParDo(ReadFromDatastore._QueryFn())){code}
so it already works on PCollections of queries internally, there's just
currently no way to pass them in from outside.
I've created my own copy that uses the input pcoll instead of creating one,
which seems to work just fine, with one drawback: There's no way to match the
entities in the output to the input queries. So I've also created wrappers for
{{_SplitQueryFn}} and {{_QueryFn}} that pass along generic userdata. Here's the
result:
{code:python}
from typing import Generic, Iterable, Tuple, TypeVar
import apache_beam
from apache_beam.io.gcp.datastore.v1new import datastoreio, types
T = TypeVar("T")
U = TypeVar("U")
@apache_beam.typehints.with_input_types(Tuple[T, types.Query])
@apache_beam.typehints.with_output_types(Tuple[T, types.Entity])
class ReadFromDatastoreDynamically(apache_beam.PTransform, Generic[T]):
"""
A variant of datastoreio.ReadFromDatastore that takes queries from the
incoming PCollection instead of on construction,
and also carries generic userdata so you can identify outputs.
"""
def __init__(self, num_splits=0):
super().__init__()
self._num_splits = num_splits
def expand(self, input_or_inputs):
# This is a composite transform involving the following:
# 1. Take the queries from the input and apply a ``ParDo``
# that splits them into `num_splits` queries each, if possible.
#
# If the value of `num_splits` is 0, the number of splits will be
# computed dynamically based on the size of the data for the `query`.
#
# 2. The resulting ``PCollection`` is sharded across workers using a
# ``Reshuffle`` operation.
#
# 3. In the third step, a ``ParDo`` reads entities for each query and
# outputs a ``PCollection[Tuple[T, Entity]]``.
return (
input_or_inputs
| "SplitQuery" >>
apache_beam.ParDo(ReadFromDatastoreDynamically._SplitQueryFn(self._num_splits))
| apache_beam.Reshuffle()
| "Read" >>
apache_beam.ParDo(ReadFromDatastoreDynamically._QueryFn())
)
@apache_beam.typehints.with_input_types(Tuple[U, types.Query])
@apache_beam.typehints.with_output_types(Tuple[U, types.Query])
class _SplitQueryFn(apache_beam.DoFn, Generic[U]):
"""
Extends the original _SplitQueryFn with userdata.
"""
def __init__(self, num_splits):
super().__init__()
# caution: we use a protected inner class, this could break on Beam
update!
self.__delegate =
datastoreio.ReadFromDatastore._SplitQueryFn(num_splits)
def process(self, element: Tuple[U, types.Query], *args, **kwargs) ->
Iterable[Tuple[U, types.Query]]:
key, query = element
split_queries = self.__delegate.process(query=query, *args,
**kwargs)
return ((key, q) for q in split_queries)
@apache_beam.typehints.with_input_types(Tuple[U, types.Query])
@apache_beam.typehints.with_output_types(Tuple[U, types.Entity])
class _QueryFn(apache_beam.DoFn, Generic[U]):
"""
Extends the original _SplitQueryFn with userdata.
"""
def __init__(self):
super().__init__()
# caution: we inherit from a protected inner class, this could
break on Beam update!
self.__delegate = datastoreio.ReadFromDatastore._QueryFn()
def process(self, element: Tuple[U, types.Query], *args, **kwargs) ->
Iterable[Tuple[U, types.Entity]]:
key, query = element
return ((key, e) for e in self.__delegate.process(query, *args,
**kwargs))
{code}
This relies on using internal DoFns, whose API could change in future Beam
versions, so I'd still prefer an official solution, but for now this will
satisfy my needs.
> Allow Queries from PCollection in
> datastore.v1new.datastoreio.ReadFromDatastore
> -------------------------------------------------------------------------------
>
> Key: BEAM-12791
> URL: https://issues.apache.org/jira/browse/BEAM-12791
> Project: Beam
> Issue Type: New Feature
> Components: io-py-gcp
> Reporter: Willi Schinmeyer
> Priority: P2
>
> As far as I can tell, I currently have to provide the Datastore Query when I
> initially create my pipeline. However, I would like to only fetch Entities
> corresponding to the data currently being processed by my pipeline. In other
> words, I would like to pass a PCollection of Queries (or at least Query
> Filters) to ReadFromDatastore.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)