[
https://issues.apache.org/jira/browse/BEAM-7496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mingliang Qi updated BEAM-7496:
-------------------------------
Description:
There was an index bug in hadoopfilesystem.py
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/hadoopfilesystem.py#L72]
The end index should not be included according to the inherited function
description:
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystemio.py#L51]
This will leads to following Error message during runtime:
{code:java}
File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line
406, in run
self._options).run(False)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line
419, in run
return self.runner.run_pipeline(self, self._options)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.py",
line 132, in run_pipeline
return runner.run_pipeline(pipeline, options)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 276, in run_pipeline
default_environment=self._default_environment))
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 280, in run_via_runner_api
return self.run_stages(*self.create_stages(pipeline_proto))
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 356, in run_stages
stage_context.safe_coders)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 511, in run_stage
data_input, data_output)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 1217, in process_bundle
result_future = self._controller.control_handler.push(process_bundle)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 832, in push
response = self.worker.do_instruction(request)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 312, in do_instruction
request.instruction_id)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 331, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 554, in process_bundle
].process_encoded(data.data)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 140, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 245, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 246, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 142, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 396, in
apache_beam.runners.worker.operations.ImpulseReadOperation.process
File "apache_beam/runners/worker/operations.py", line 398, in
apache_beam.runners.worker.operations.ImpulseReadOperation.process
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/io/concat_source.py", line
86, in read
range_tracker.sub_range_tracker(source_ix)):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/tfrecordio.py",
line 183, in read_records
record = _TFRecordUtil.read_record(file_handle)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/tfrecordio.py",
line 123, in read_record
buf = file_handle.read(buf_length_expected)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystem.py",
line 261, in read
self._fetch_to_internal_buffer(num_bytes)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystem.py",
line 210, in _fetch_to_internal_buffer
buf = self._file.read(self._read_size)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystemio.py",
line 112, in readinto
b[:len(data)] = data
ValueError: cannot modify size of memoryview object
{code}
was:
There was an index bug in hadoopfilesystem.py
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/hadoopfilesystem.py#L72]
The end index should not be included according to the inherited function
description:
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystemio.py#L51]
This will leads to following Error message duration runtime:
{code:java}
File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line
406, in run
self._options).run(False)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line
419, in run
return self.runner.run_pipeline(self, self._options)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.py",
line 132, in run_pipeline
return runner.run_pipeline(pipeline, options)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 276, in run_pipeline
default_environment=self._default_environment))
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 280, in run_via_runner_api
return self.run_stages(*self.create_stages(pipeline_proto))
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 356, in run_stages
stage_context.safe_coders)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 511, in run_stage
data_input, data_output)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 1217, in process_bundle
result_future = self._controller.control_handler.push(process_bundle)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
line 832, in push
response = self.worker.do_instruction(request)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 312, in do_instruction
request.instruction_id)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
line 331, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 554, in process_bundle
].process_encoded(data.data)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
line 140, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 245, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 246, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 142, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 396, in
apache_beam.runners.worker.operations.ImpulseReadOperation.process
File "apache_beam/runners/worker/operations.py", line 398, in
apache_beam.runners.worker.operations.ImpulseReadOperation.process
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/io/concat_source.py", line
86, in read
range_tracker.sub_range_tracker(source_ix)):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/tfrecordio.py",
line 183, in read_records
record = _TFRecordUtil.read_record(file_handle)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/tfrecordio.py",
line 123, in read_record
buf = file_handle.read(buf_length_expected)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystem.py",
line 261, in read
self._fetch_to_internal_buffer(num_bytes)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystem.py",
line 210, in _fetch_to_internal_buffer
buf = self._file.read(self._read_size)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystemio.py",
line 112, in readinto
b[:len(data)] = data
ValueError: cannot modify size of memoryview object
{code}
> python sdk fail to read data from HDFS
> --------------------------------------
>
> Key: BEAM-7496
> URL: https://issues.apache.org/jira/browse/BEAM-7496
> Project: Beam
> Issue Type: Bug
> Components: io-python-hadoop
> Affects Versions: 2.12.0
> Reporter: Mingliang Qi
> Priority: Major
>
> There was an index bug in hadoopfilesystem.py
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/hadoopfilesystem.py#L72]
> The end index should not be included according to the inherited function
> description:
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystemio.py#L51]
> This will leads to following Error message during runtime:
> {code:java}
> File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line
> 406, in run
> self._options).run(False)
> File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line
> 419, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.py",
> line 132, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 276, in run_pipeline
> default_environment=self._default_environment))
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 280, in run_via_runner_api
> return self.run_stages(*self.create_stages(pipeline_proto))
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 356, in run_stages
> stage_context.safe_coders)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 511, in run_stage
> data_input, data_output)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 1217, in process_bundle
> result_future = self._controller.control_handler.push(process_bundle)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 832, in push
> response = self.worker.do_instruction(request)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 312, in do_instruction
> request.instruction_id)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 331, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 554, in process_bundle
> ].process_encoded(data.data)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 140, in process_encoded
> self.output(decoded_value)
> File "apache_beam/runners/worker/operations.py", line 245, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 246, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 142, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 396, in
> apache_beam.runners.worker.operations.ImpulseReadOperation.process
> File "apache_beam/runners/worker/operations.py", line 398, in
> apache_beam.runners.worker.operations.ImpulseReadOperation.process
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/concat_source.py",
> line 86, in read
> range_tracker.sub_range_tracker(source_ix)):
> File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/tfrecordio.py",
> line 183, in read_records
> record = _TFRecordUtil.read_record(file_handle)
> File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/tfrecordio.py",
> line 123, in read_record
> buf = file_handle.read(buf_length_expected)
> File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystem.py",
> line 261, in read
> self._fetch_to_internal_buffer(num_bytes)
> File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystem.py",
> line 210, in _fetch_to_internal_buffer
> buf = self._file.read(self._read_size)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystemio.py", line
> 112, in readinto
> b[:len(data)] = data
> ValueError: cannot modify size of memoryview object
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)