shunping commented on code in PR #32463:
URL: https://github.com/apache/beam/pull/32463#discussion_r1763692980
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -572,6 +577,9 @@ def _read(self, size=-1):
self._done = True
return res
+ def flush(self):
+ return
Review Comment:
If not set, an exception will occur.
```
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1358, in process_bundle
result_future =
self._worker_handler.control_conn.push(process_bundle_req)
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
line 384, in push
response = self.worker.do_instruction(request)
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 656, in do_instruction
return getattr(self, request_type)(
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 694, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 1119, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 237, in process_encoded
self.output(decoded_value)
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/operations.py",
line 569, in output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/operations.py",
line 263, in receive
self.consumer.process(windowed_value)
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/operations.py",
line 1073, in process
delayed_applications = self.dofn_runner.process_with_sized_restriction(
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py",
line 1533, in process_with_sized_restriction
return self.do_fn_invoker.invoke_process(
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py",
line 897, in invoke_process
residual = self._invoke_process_per_window(
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py",
line 1059, in _invoke_process_per_window
self.output_handler.handle_process_outputs(
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py",
line 1677, in handle_process_outputs
for result in results:
File
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/dataframe/io.py",
line 646, in process
for df in frames:
File
"/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py",
line 1624, in __next__
return self.get_chunk()
File
"/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py",
line 1733, in get_chunk
return self.read(nrows=size)
File
"/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py",
line 1708, in read
self.close()
File
"/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py",
line 1411, in close
self.handles.close()
File
"/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/common.py",
line 126, in close
self.handle.flush()
AttributeError: '_TruncatingFileHandle' object has no attribute 'flush'
```
It is in pandas that flush() is called when it closes a handle:
https://github.com/pandas-dev/pandas/blob/081dcdee8d754af90e307cf2311b06b3d02fae2a/pandas/io/common.py#L137,
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]