[ 
https://issues.apache.org/jira/browse/BEAM-11915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299901#comment-17299901
 ] 

Muhammad Anas commented on BEAM-11915:
--------------------------------------

[~yichi] Got it.

Our usecase is that our documents are arranged in an arbitrarily nested tree 
structure. Child documents have a parent_id field that contains {{_id}} of 
parent document. Additionally, we keep the {{_id}} of root document as 
{{root_id}} field on each document in that tree.
Reason for using custom GUIDs as {{_id}} field was that for some of the 
documents, we need to make some updates after insertion. We generate a GUID 
upfront and then send both inserts and updates as {{upsert}} operations 
(matching on custom built, stable GUID), not caring about the order in which 
the upserts are processed, because our updates only set new fields on documents 
that were not set during insert.
You are right. We can move the custom GUIDs to another field and use ObjectId 
as _id . We can set a unique index on GUID field to optimize queries on it and 
we can also upsert by matching on that field. Do you see any downside to this?

> Python MondgoDB Connector: TypeError: '>=' not supported between instances of 
> 'str' and 'ObjectId'
> --------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-11915
>                 URL: https://issues.apache.org/jira/browse/BEAM-11915
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-mongodb
>    Affects Versions: 2.28.0
>            Reporter: Muhammad Anas
>            Assignee: Yichi Zhang
>            Priority: P2
>
> I am trying to use ReadFromMongoDB transform to read data from a MongoDB 
> collection. I tried both with a MongoDB hosted on Atlas (with 
> bucket_auto=True) and with a standalone server running locally (without 
> bucket_auto=True). I am getting similar error in both cases, although it 
> originates in different functions for each case.
> *Python Version:* 3.8.8
> *Runner:* Direct
> *OS:* Windows 10 64 bit
> *apache-beam Python SDK version:* 2.28.0
> Here is a minimal pipeline code that produces the error:
> {code:python}
> """A mongodb io workflow."""
> from __future__ import absolute_import
> import argparse
> import logging
> import re
> from past.builtins import unicode
> import apache_beam as beam
> from apache_beam.io import ReadFromMongoDB
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> def run(argv=None, save_main_session=True):
>   """Main entry point; defines and runs the pipeline."""
>   parser = argparse.ArgumentParser()
>   parser.add_argument(
>       '--output',
>       dest='output',
>       required=True,
>       help='Output file to write results to.')
>   known_args, pipeline_args = parser.parse_known_args(argv)
>   pipeline_options = PipelineOptions(pipeline_args)
>   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>   with beam.Pipeline(options=pipeline_options) as p:
>     # Read mongodb documents into a PCollection.
>     step_runs = p | 'Read' >> ReadFromMongoDB(uri='mongodb://127.0.0.1:27017',
>                            db='mydb',
>                            coll='mycoll',
>                         #    bucket_auto=True,
>                         )
> if __name__ == '__main__':
>   logging.getLogger().setLevel(logging.INFO)
>   run()
> {code}
>  
> Stacktrace when connecting to local standalone MongoDB:
> {noformat}
> Traceback (most recent call last):
>   File "apache_beam\runners\common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
>  line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py",
>  line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", 
> line 1566, in split
>     for source_bundle in source_bundles:
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", 
> line 220, in split
>     split_keys = self._get_split_keys(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", 
> line 275, in _get_split_keys
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File ".\mongo-pipe-test.py", line 60, in <module>
>     run()
>   File ".\mongo-pipe-test.py", line 45, in run
>     def format_result(step_run):
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", 
> line 580, in __exit__
>     self.result = self.run()
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", 
> line 559, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py",
>  line 133, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 182, in run_pipeline
>     self._latest_run_result = self.run_via_runner_api(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 357, in run_stages
>     stage_results = self._run_stage(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 548, in _run_stage
>     last_result, deferred_inputs, fired_timers = self._run_bundle(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 594, in _run_bundle
>     result, splits = bundle_manager.process_bundle(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 896, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py",
>  line 380, in push
>     response = self.worker.do_instruction(request)
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
>  line 606, in do_instruction
>     return getattr(self, request_type)(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
>  line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
>  line 999, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
>  line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam\runners\worker\operations.py", line 357, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1321, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", 
> line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam\runners\common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
>  line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py",
>  line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", 
> line 1566, in split
>     for source_bundle in source_bundles:
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", 
> line 220, in split
>     split_keys = self._get_split_keys(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", 
> line 275, in _get_split_keys
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId' 
> [while running 
> 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
> {noformat}
> Stacktrace when connecting to MongoDB hosted on Atlas:
> {noformat}
> Traceback (most recent call last):
>   File "apache_beam\runners\common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
>  line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py",
>  line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", 
> line 1566, in split
>     for source_bundle in source_bundles:
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", 
> line 212, in split
>     for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", 
> line 292, in _get_auto_buckets
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File ".\mongo-pipe-test.py", line 59, in <module>
>     run()
>   File ".\mongo-pipe-test.py", line 44, in run
>     def format_result(step_run):
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", 
> line 580, in __exit__
>     self.result = self.run()
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", 
> line 559, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py",
>  line 133, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 182, in run_pipeline
>     self._latest_run_result = self.run_via_runner_api(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 357, in run_stages
>     stage_results = self._run_stage(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 548, in _run_stage
>     last_result, deferred_inputs, fired_timers = self._run_bundle(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 594, in _run_bundle
>     result, splits = bundle_manager.process_bundle(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
>  line 896, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py",
>  line 380, in push
>     response = self.worker.do_instruction(request)
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
>  line 606, in do_instruction
>     return getattr(self, request_type)(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
>  line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
>  line 999, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
>  line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam\runners\worker\operations.py", line 357, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1321, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", 
> line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam\runners\common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
>  line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py",
>  line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", 
> line 1566, in split
>     for source_bundle in source_bundles:
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", 
> line 212, in split
>     for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
>   File 
> "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", 
> line 292, in _get_auto_buckets
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId' 
> [while running 
> 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
>  {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to