Colin Le Nost created BEAM-9245:
-----------------------------------
Summary: Unable to pull datatore Entity which contains dict
properties
Key: BEAM-9245
URL: https://issues.apache.org/jira/browse/BEAM-9245
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Affects Versions: 2.18.0
Reporter: Colin Le Nost
Hello, we are facing a small bug while reading Datastore entities using
ReadFromDatastore transform (python SDK, 2.17 & 2.18)
We are unable to retrieve entities that contain a dictionary. We think there is
implicit casting from these properties into Datastore entity, but when the
client is trying to retrieve the entity using the key, it breaks (because this
entity has no key).
h2. Stacktrace
{code:python}
File
".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py",
line 269, in process
yield types.Entity.from_client_entity(client_entity)
File
".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py",
line 225, in from_client_entity
value = Entity.from_client_entity(value)
File
".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py",
line 219, in from_client_entity
Key.from_client_key(client_entity.key),
File
".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py",
line 156, in from_client_key
return Key(client_key.flat_path, project=client_key.project,
AttributeError: 'NoneType' object has no attribute 'flat_path' [while running
'Read from datastore/Read']
{code}
h2. Here is some code to reproduce:
# Insert a datastore entity using the given function
# Run the dataflow pipeline using DirectRunner
{code:python}
import apache_beam as beam
from google.cloud import datastore
from apache_beam.io.gcp.datastore.v1new.types import Query
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.options.pipeline_options import StandardOptions,
PipelineOptions
DATASTORE_KIND = "my_entity_kind"
PROJECT_ID = "my_project_id"
def create_datastore_entity():
client = datastore.Client(PROJECT_ID)
key = client.key(DATASTORE_KIND, "my_task")
entity = client.get(key=key)
if entity is not None:
raise Exception("Existing entity")
else:
entity_dict = {"regular_field": "test", "nested_field": {"field1":
"my_field1"}}
entity = datastore.Entity(key=key)
entity_dict = {k: v for k, v in entity_dict.items()}
entity.update(entity_dict)
client.put(entity)
def my_func(element):
print(element)
return element
def run():
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).runner = "DirectRunner"
p = beam.Pipeline(options=pipeline_options)
my_ds_query = Query(kind=DATASTORE_KIND, project=PROJECT_ID,)
p | "Read from datastore" >> ReadFromDatastore(
query=my_ds_query
) | "Print entity" >> beam.Map(my_func)
p.run().wait_until_finish()
if __name__ == "__main__":
create_datastore_entity()
run()
{code}
h2.
Workaround
Currently, we mocked the library using this code (modifying the Entity class,
in `sdks/python/apache_beam/io/gcp/datastore/v1new/types.py`, aka this
[line|[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1new/types.py#L231]].)
{code:python}
@staticmethod
def from_client_entity(client_entity):
res = Entity(
Key.from_client_key(client_entity.key),
exclude_from_indexes=set(client_entity.exclude_from_indexes))
for name, value in client_entity.items():
if isinstance(value, key.Key):
value = Key.from_client_key(value)
if isinstance(value, entity.Entity):
if value.key:
value = Entity.from_client_entity(value)
else:
value = {k:v for k,v in value.items()}
res.properties[name] = value
return res
{code}
If the workaround works for you, I can do the PR.
Thanks, Colin
--
This message was sent by Atlassian Jira
(v8.3.4#803005)