[ 
https://issues.apache.org/jira/browse/BEAM-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri updated BEAM-7608:
----------------------------
    Status: Open  (was: Triage Needed)

> v1new ReadFromDatastore skips entities
> --------------------------------------
>
>                 Key: BEAM-7608
>                 URL: https://issues.apache.org/jira/browse/BEAM-7608
>             Project: Beam
>          Issue Type: Bug
>          Components: io-python-gcp
>    Affects Versions: 2.13.0
>         Environment: MacOS 10.14.5, Python 2.7
>            Reporter: Jacob Gur
>            Assignee: Udi Meiri
>            Priority: Critical
>
> A simple map over a datastore kind in local emulator using the new 
> v1new.datastoreio.ReadFromDatastore skip entities.
> The kind has 1516 entities, and when I map over it using the old 
> ReadFromDatastore transform, it maps all of them, i.e., I can map to id and 
> write to text file.
> But the new transform only maps 365 entities. There is no error. The tail of 
> the standard output is:
> {code:java}
> INFO:root:Latest stats timestamp for kind face_apilog is 2019-06-18 
> 08:15:21+00:00
>  INFO:root:Estimated size bytes for query: 116188
>  INFO:root:Splitting the query into 12 splits
>  INFO:root:Running 
> (((GetEntities/Reshuffle/ReshufflePerKey/GroupByKey/Read)(ref_AppliedPTransform_GetEntities/Reshuffle/ReshufflePerKey/FlatMap(restore_timestamps)_14))((ref_AppliedPTransform_GetEntities/Reshuffle/RemoveRandomKeys_15)(ref_AppliedPTransform_GetEntities/Read_16)))((ref_AppliedPTransform_MapToId_17)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WriteBundles_24)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Pair_25)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WindowInto(WindowIntoFn)_26)(WriteToFile/Write/WriteImpl/GroupByKey/Write)))))
>  INFO:root:Running 
> (WriteToFile/Write/WriteImpl/GroupByKey/Read)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Extract_31)(ref_PCollection_PCollection_20/Write))
>  INFO:root:Running 
> (ref_PCollection_PCollection_12/Read)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/PreFinalize_32)(ref_PCollection_PCollection_21/Write))
>  INFO:root:Running 
> (ref_PCollection_PCollection_12/Read)+(ref_AppliedPTransform_WriteToFile/Write/WriteImpl/FinalizeWrite_33)
>  INFO:root:Starting finalize_write threads with num_shards: 1 (skipped: 0), 
> batches: 1, num_threads: 1
>  INFO:root:Renamed 1 shards in 0.12 seconds.{code}
>  
> The code for the job on the new transform is:
>  
>  
> {code:java}
> from __future__ import absolute_import
> import logging
> import os
> import sys
> import apache_beam as beam
> from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
> from apache_beam.io.gcp.datastore.v1new.types import Query
> # TODO: should be set outside of python process
> os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085'
> def map_to_id(element):
>  face_log_id = element.to_client_entity().id
>  return face_log_id
> def run(argv=None):
>  p = beam.Pipeline(argv=argv)
>  project = 'dev'
>  (p
>  | 'GetEntities' >> ReadFromDatastore(Query(kind='face_apilog', 
> project=project))
>  | 'MapToId' >> beam.Map(map_to_id)
>  | 'WriteToFile' >> beam.io.WriteToText('result')
>  )
>  p.run().wait_until_finish()
> if __name__ == '__main__':
>  logging.getLogger().setLevel(logging.INFO)
>  run(sys.argv){code}
>  
> For comparison, the code for the job on the old transform is:
>  
> {code:java}
> from __future__ import absolute_import
> import logging
> import os
> import sys
> import apache_beam as beam
> from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
> from google.cloud.proto.datastore.v1 import query_pb2
> # TODO: should be set outside of python process
> os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085'
> def map_to_id(element):
>  face_log_id = element.key.path[-1].id
>  return face_log_id
> def run(argv=None):
>  p = beam.Pipeline(argv=argv)
>  project = 'dev'
>  query = query_pb2.Query()
>  query.kind.add().name = 'face_apilog'
>  (p
>  | 'GetEntities' >> ReadFromDatastore(project=project, query=query)
>  # TODO: ParDo???
>  | 'MapToId' >> beam.Map(map_to_id)
>  | 'WriteToFile' >> beam.io.WriteToText('result')
>  )
>  p.run().wait_until_finish()
> if __name__ == '__main__':
>  logging.getLogger().setLevel(logging.INFO)
>  run(sys.argv){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to