Maksym Skorupskyi created BEAM-12120:
----------------------------------------
Summary: Python IO MongoDB: integer and string `_id` keys are not
supported
Key: BEAM-12120
URL: https://issues.apache.org/jira/browse/BEAM-12120
Project: Beam
Issue Type: Bug
Components: io-py-mongodb
Affects Versions: 2.28.0, 2.27.0
Reporter: Maksym Skorupskyi
Assignee: Yichi Zhang
Python IO MongoDB: integer and string `_id` keys are not supported.
Usually *ObjectId* is using for *`_id`* key, but sometimes you can deal with
*int* and *str* keys. Reading from such MongoDB collection will raise errors.
h2. Integer `_id` key:
{code:java}
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1426, in process
element)
File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545,
in initial_restriction
range_tracker = element_source.get_range_tracker(None, None)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
243, in get_range_tracker
start_position, stop_position)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
359, in _replace_none_positions
stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
418, in increment_id
id_number = _ObjectIdHelper.id_to_int(object_id)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
383, in id_to_int
ints = struct.unpack('>III', id.binary)
AttributeError: 'int' object has no attribute 'binary'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in
<module>
sys.exit(run())
File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
options=options, ingestion_ts=ingestion_ts, table_name=table_name
File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582,
in __exit__
self.result = self.run()
File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561,
in run
return self.runner.run_pipeline(self, self._options)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 126, in run_pipeline
return runner.run_pipeline(pipeline, options)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 183, in run_pipeline
pipeline.to_runner_api(default_environment=self._default_environment))
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 193, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 360, in run_stages
bundle_context_manager,
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 556, in _run_stage
bundle_manager)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 596, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 897, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
line 380, in push
response = self.worker.do_instruction(request)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1000, in process_bundle
element.data)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 228, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 357, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 359, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1321, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446,
in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1426, in process
element)
File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545,
in initial_restriction
range_tracker = element_source.get_range_tracker(None, None)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
243, in get_range_tracker
start_position, stop_position)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
359, in _replace_none_positions
stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
418, in increment_id
id_number = _ObjectIdHelper.id_to_int(object_id)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
383, in id_to_int
ints = struct.unpack('>III', id.binary)
AttributeError: 'int' object has no attribute 'binary' [while running
'sources/Read
Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
ERROR:root:mongo_to_bq_raw.py: 'int' object has no attribute 'binary' [while
running 'sources/Read
Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
{code}
h2. String `_id` key:
{code:java}
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1426, in process
element)
File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545,
in initial_restriction
range_tracker = element_source.get_range_tracker(None, None)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
243, in get_range_tracker
start_position, stop_position)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
359, in _replace_none_positions
stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
418, in increment_id
id_number = _ObjectIdHelper.id_to_int(object_id)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
383, in id_to_int
ints = struct.unpack('>III', id.binary)
AttributeError: 'str' object has no attribute 'binary'During handling of the
above exception, another exception occurred:Traceback (most recent call last):
File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in
<module>
sys.exit(run())
File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
options=options, ingestion_ts=ingestion_ts, table_name=table_name
File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582,
in __exit__
self.result = self.run()
File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561,
in run
return self.runner.run_pipeline(self, self._options)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 126, in run_pipeline
return runner.run_pipeline(pipeline, options)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 183, in run_pipeline
pipeline.to_runner_api(default_environment=self._default_environment))
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 193, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 360, in run_stages
bundle_context_manager,
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 556, in _run_stage
bundle_manager)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 596, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 897, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
line 380, in push
response = self.worker.do_instruction(request)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1000, in process_bundle
element.data)
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 228, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 357, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 359, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1321, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446,
in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1426, in process
element)
File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545,
in initial_restriction
range_tracker = element_source.get_range_tracker(None, None)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
243, in get_range_tracker
start_position, stop_position)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
359, in _replace_none_positions
stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
418, in increment_id
id_number = _ObjectIdHelper.id_to_int(object_id)
File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line
383, in id_to_int
ints = struct.unpack('>III', id.binary)
AttributeError: 'str' object has no attribute 'binary' [while running
'tagged_objects/Read
Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
ERROR:root:mongo_to_bq_raw.py: 'str' object has no attribute 'binary' [while
running 'tagged_objects/Read
Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)