shunping commented on code in PR #32463:
URL: https://github.com/apache/beam/pull/32463#discussion_r1763692980


##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -572,6 +577,9 @@ def _read(self, size=-1):
       self._done = True
     return res
 
+  def flush(self):
+    return

Review Comment:
   If not set, an exception will occur.
   ```
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1358, in process_bundle
       result_future = 
self._worker_handler.control_conn.push(process_bundle_req)
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
 line 384, in push
       response = self.worker.do_instruction(request)
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 656, in do_instruction
       return getattr(self, request_type)(
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 694, in process_bundle
       bundle_processor.process_bundle(instruction_id))
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 1119, in process_bundle
       input_op_by_transform_id[element.transform_id].process_encoded(
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 237, in process_encoded
       self.output(decoded_value)
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/operations.py",
 line 569, in output
       _cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/operations.py",
 line 263, in receive
       self.consumer.process(windowed_value)
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/operations.py",
 line 1073, in process
       delayed_applications = self.dofn_runner.process_with_sized_restriction(
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py",
 line 1533, in process_with_sized_restriction
       return self.do_fn_invoker.invoke_process(
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py",
 line 897, in invoke_process
       residual = self._invoke_process_per_window(
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py",
 line 1059, in _invoke_process_per_window
       self.output_handler.handle_process_outputs(
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py",
 line 1677, in handle_process_outputs
       for result in results:
     File 
"/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/dataframe/io.py",
 line 646, in process
       for df in frames:
     File 
"/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py",
 line 1624, in __next__
       return self.get_chunk()
     File 
"/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py",
 line 1733, in get_chunk
       return self.read(nrows=size)
     File 
"/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py",
 line 1708, in read
       self.close()
     File 
"/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py",
 line 1411, in close
       self.handles.close()
     File 
"/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/common.py",
 line 126, in close
       self.handle.flush()
   AttributeError: '_TruncatingFileHandle' object has no attribute 'flush'
   ```
   
   It is in pandas that flush() is called when it closes a handle: 
https://github.com/pandas-dev/pandas/blob/081dcdee8d754af90e307cf2311b06b3d02fae2a/pandas/io/common.py#L137,
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to