[
https://issues.apache.org/jira/browse/BEAM-9245?focusedWorklogId=766221&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-766221
]
ASF GitHub Bot logged work on BEAM-9245:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/May/22 19:00
Start Date: 04/May/22 19:00
Worklog Time Spent: 10m
Work Description: ihji commented on code in PR #17499:
URL: https://github.com/apache/beam/pull/17499#discussion_r865171235
##########
sdks/python/apache_beam/io/gcp/datastore/v1new/types.py:
##########
@@ -267,7 +267,10 @@ def from_client_entity(client_entity):
if isinstance(value, key.Key):
value = Key.from_client_key(value)
if isinstance(value, entity.Entity):
- value = Entity.from_client_entity(value)
+ if value.key:
+ value = Entity.from_client_entity(value)
+ else:
+ value = {k: v for k, v in value.items()}
Review Comment:
Done.
Issue Time Tracking
-------------------
Worklog Id: (was: 766221)
Time Spent: 1h 10m (was: 1h)
> Unable to pull datatore Entity which contains dict properties
> -------------------------------------------------------------
>
> Key: BEAM-9245
> URL: https://issues.apache.org/jira/browse/BEAM-9245
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.18.0
> Reporter: Colin Le Nost
> Priority: P3
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Hello, we are facing a small bug while reading Datastore entities using
> ReadFromDatastore transform (python SDK, 2.17 & 2.18)
> We are unable to retrieve entities that contain a dictionary. We think there
> is implicit casting from these properties into Datastore entity, but when the
> client is trying to retrieve the entity using the key, it breaks (because
> this entity has no key).
> h2. Stacktrace
> {code:python}
> File
> ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py",
> line 269, in process
> yield types.Entity.from_client_entity(client_entity)
> File
> ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py",
> line 225, in from_client_entity
> value = Entity.from_client_entity(value)
> File
> ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py",
> line 219, in from_client_entity
> Key.from_client_key(client_entity.key),
> File
> ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py",
> line 156, in from_client_key
> return Key(client_key.flat_path, project=client_key.project,
> AttributeError: 'NoneType' object has no attribute 'flat_path' [while running
> 'Read from datastore/Read']
> {code}
>
> h2. Here is some code to reproduce:
> # Insert a datastore entity using the given function
> # Run the dataflow pipeline using DirectRunner
>
> {code:python}
> import apache_beam as beam
> from google.cloud import datastore
> from apache_beam.io.gcp.datastore.v1new.types import Query
> from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
> from apache_beam.options.pipeline_options import StandardOptions,
> PipelineOptions
> DATASTORE_KIND = "my_entity_kind"
> PROJECT_ID = "my_project_id"
> def create_datastore_entity():
> client = datastore.Client(PROJECT_ID)
> key = client.key(DATASTORE_KIND, "my_task")
> entity = client.get(key=key)
> if entity is not None:
> raise Exception("Existing entity")
> else:
> entity_dict = {"regular_field": "test", "nested_field": {"field1":
> "my_field1"}}
> entity = datastore.Entity(key=key)
> entity_dict = {k: v for k, v in entity_dict.items()}
> entity.update(entity_dict)
> client.put(entity)
> def my_func(element):
> print(element)
> return element
> def run():
> pipeline_options = PipelineOptions()
> pipeline_options.view_as(StandardOptions).runner = "DirectRunner"
> p = beam.Pipeline(options=pipeline_options)
> my_ds_query = Query(kind=DATASTORE_KIND, project=PROJECT_ID,)
> p | "Read from datastore" >> ReadFromDatastore(
> query=my_ds_query
> ) | "Print entity" >> beam.Map(my_func)
> p.run().wait_until_finish()
> if __name__ == "__main__":
> create_datastore_entity()
> run()
> {code}
> h2.
> Workaround
> Currently, we mocked the library using this code (modifying the Entity class,
> in `sdks/python/apache_beam/io/gcp/datastore/v1new/types.py`, aka this
> [line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1new/types.py#L231]
> ).
> {code:python}
> @staticmethod
> def from_client_entity(client_entity):
> res = Entity(
> Key.from_client_key(client_entity.key),
> exclude_from_indexes=set(client_entity.exclude_from_indexes))
> for name, value in client_entity.items():
> if isinstance(value, key.Key):
> value = Key.from_client_key(value)
> if isinstance(value, entity.Entity):
> if value.key:
> value = Entity.from_client_entity(value)
> else:
> value = {k:v for k,v in value.items()}
> res.properties[name] = value
> return res
> {code}
> If the workaround works for you, I can do the PR.
>
> Thanks, Colin
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)