[ 
https://issues.apache.org/jira/browse/BEAM-12119?focusedWorklogId=607234&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-607234
 ]

ASF GitHub Bot logged work on BEAM-12119:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Jun/21 18:24
            Start Date: 04/Jun/21 18:24
    Worklog Time Spent: 10m 
      Work Description: y1chi commented on a change in pull request #14460:
URL: https://github.com/apache/beam/pull/14460#discussion_r645769288



##########
File path: sdks/python/apache_beam/io/mongodbio.py
##########
@@ -136,30 +149,195 @@ def __init__(
 
     Returns:
       :class:`~apache_beam.transforms.ptransform.PTransform`
-
     """
     if extra_client_params is None:
       extra_client_params = {}
     if not isinstance(db, str):
-      raise ValueError('ReadFromMongDB db param must be specified as a string')
+      raise ValueError("ReadFromMongDB db param must be specified as a string")
     if not isinstance(coll, str):
       raise ValueError(
-          'ReadFromMongDB coll param must be specified as a '
-          'string')
+          "ReadFromMongDB coll param must be specified as a string")
     self._mongo_source = _BoundedMongoSource(
         uri=uri,
         db=db,
         coll=coll,
         filter=filter,
         projection=projection,
         extra_client_params=extra_client_params,
-        bucket_auto=bucket_auto)
+        bucket_auto=bucket_auto,
+    )
 
   def expand(self, pcoll):
     return pcoll | iobase.Read(self._mongo_source)
 
 
+class _ObjectIdRangeTracker(OrderedPositionRangeTracker):
+  """RangeTracker for tracking mongodb _id of bson ObjectId type."""
+  def position_to_fraction(
+      self,
+      pos: ObjectId,
+      start: ObjectId,
+      end: ObjectId,
+  ):
+    """Returns the fraction of keys in the range [start, end) that
+    are less than the given key.
+    """
+    pos_number = _ObjectIdHelper.id_to_int(pos)
+    start_number = _ObjectIdHelper.id_to_int(start)
+    end_number = _ObjectIdHelper.id_to_int(end)
+    return (pos_number - start_number) / (end_number - start_number)
+
+  def fraction_to_position(
+      self,
+      fraction: float,
+      start: ObjectId,
+      end: ObjectId,
+  ):
+    """Converts a fraction between 0 and 1
+    to a position between start and end.
+    """
+    start_number = _ObjectIdHelper.id_to_int(start)
+    end_number = _ObjectIdHelper.id_to_int(end)
+    total = end_number - start_number
+    pos = int(total * fraction + start_number)
+    # make sure split position is larger than start position and smaller than
+    # end position.
+    if pos <= start_number:
+      return _ObjectIdHelper.increment_id(start, 1)
+
+    if pos >= end_number:
+      return _ObjectIdHelper.increment_id(end, -1)
+
+    return _ObjectIdHelper.int_to_id(pos)
+
+
+class _StrRangeTracker(LexicographicKeyRangeTracker):

Review comment:
       The amount of duplication makes me feel that either 
LexicographicKeyRangeTracker should handle both bytes and str, or the common 
logic should be extracted to helper functions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 607234)
    Time Spent: 13h 40m  (was: 13.5h)

> Python IO MongoDB: integer and string `_id` keys are not supported
> ------------------------------------------------------------------
>
>                 Key: BEAM-12119
>                 URL: https://issues.apache.org/jira/browse/BEAM-12119
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-mongodb
>    Affects Versions: 2.27.0, 2.28.0
>            Reporter: Maksym Skorupskyi
>            Priority: P2
>              Labels: MongoDB, Python
>          Time Spent: 13h 40m
>  Remaining Estimate: 0h
>
> Python IO MongoDB: integer and string `_id` keys are not supported.
>  
> Usually *ObjectId* is using for *`_id`* key, but sometimes you can deal with 
> *int* and *str* keys. Reading from such MongoDB collection will raise errors. 
> h2. Integer `_id` key:
> {code:java}
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1426, in process
>     element)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 
> 1545, in initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 243, in get_range_tracker
>     start_position, stop_position)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 359, in _replace_none_positions
>     stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 418, in increment_id
>     id_number = _ObjectIdHelper.id_to_int(object_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 383, in id_to_int
>     ints = struct.unpack('>III', id.binary)
> AttributeError: 'int' object has no attribute 'binary'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in 
> <module>
>     sys.exit(run())
>   File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
>     options=options, ingestion_ts=ingestion_ts, table_name=table_name
>   File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, 
> in __exit__
>     self.result = self.run()
>   File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, 
> in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
>  line 126, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 183, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 360, in run_stages
>     bundle_context_manager,
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 556, in _run_stage
>     bundle_manager)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 596, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 897, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>  line 380, in push
>     response = self.worker.do_instruction(request)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", 
> line 607, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", 
> line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1000, in process_bundle
>     element.data)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 357, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1321, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 
> 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1426, in process
>     element)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 
> 1545, in initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 243, in get_range_tracker
>     start_position, stop_position)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 359, in _replace_none_positions
>     stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 418, in increment_id
>     id_number = _ObjectIdHelper.id_to_int(object_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 383, in id_to_int
>     ints = struct.unpack('>III', id.binary)
> AttributeError: 'int' object has no attribute 'binary' [while running 
> 'sources/Read 
> Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
> ERROR:root:mongo_to_bq_raw.py: 'int' object has no attribute 'binary' [while 
> running 'sources/Read 
> Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
> {code}
> h2. String `_id` key:
> {code:java}
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1426, in process
>     element)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 
> 1545, in initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 243, in get_range_tracker
>     start_position, stop_position)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 359, in _replace_none_positions
>     stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 418, in increment_id
>     id_number = _ObjectIdHelper.id_to_int(object_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 383, in id_to_int
>     ints = struct.unpack('>III', id.binary)
> AttributeError: 'str' object has no attribute 'binary'During handling of the 
> above exception, another exception occurred:Traceback (most recent call last):
>   File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in 
> <module>
>     sys.exit(run())
>   File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
>     options=options, ingestion_ts=ingestion_ts, table_name=table_name
>   File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, 
> in __exit__
>     self.result = self.run()
>   File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, 
> in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
>  line 126, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 183, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 360, in run_stages
>     bundle_context_manager,
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 556, in _run_stage
>     bundle_manager)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 596, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 897, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>  line 380, in push
>     response = self.worker.do_instruction(request)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", 
> line 607, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", 
> line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1000, in process_bundle
>     element.data)
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 357, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1321, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 
> 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1426, in process
>     element)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 
> 1545, in initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 243, in get_range_tracker
>     start_position, stop_position)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 359, in _replace_none_positions
>     stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 418, in increment_id
>     id_number = _ObjectIdHelper.id_to_int(object_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 
> 383, in id_to_int
>     ints = struct.unpack('>III', id.binary)
> AttributeError: 'str' object has no attribute 'binary' [while running 
> 'tagged_objects/Read 
> Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
> ERROR:root:mongo_to_bq_raw.py: 'str' object has no attribute 'binary' [while 
> running 'tagged_objects/Read 
> Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to