[ 
https://issues.apache.org/jira/browse/BEAM-7747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889229#comment-16889229
 ] 

Valentyn Tymofieiev commented on BEAM-7747:
-------------------------------------------

Currently, _FastAvroSink.close() calls [1] `flush()` method on FastAvro writer, 
which does not close the file handle [2]. On Windows platform, the temporary 
output file created by the sink cannot be accessed until it is closed.
PR  #9111 out to fix this.

[1] 
https://github.com/apache/beam/blob/2bbf6eb1f65f49625d2f17b2703ec1e774f8c85f/sdks/python/apache_beam/io/avroio.py#L631
[2] 
https://github.com/fastavro/fastavro/blob/53ed64d95d4f82875d1238b1f76d4d87547e40e1/fastavro/_write.pyx#L548

> ERROR: test_sink_transform (apache_beam.io.avroio_test.TestFastAvro) Fails on 
> Windows
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-7747
>                 URL: https://issues.apache.org/jira/browse/BEAM-7747
>             Project: Beam
>          Issue Type: Bug
>          Components: io-python-avro, test-failures
>            Reporter: Valentyn Tymofieiev
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> ======================================================================
> ERROR: test_sink_transform (apache_beam.io.avroio_test.TestFastAvro)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File "C:\projects\beam\sdks\python\apache_beam\io\avroio_test.py", line 
> 436, in test_sink_transform
>     | avroio.WriteToAvro(path, self.SCHEMA, use_fastavro=self.use_fastavro)
>   File "C:\projects\beam\sdks\python\apache_beam\pipeline.py", line 426, in 
> __exit__
>     self.run().wait_until_finish()
>   File "C:\projects\beam\sdks\python\apache_beam\testing\test_pipeline.py", 
> line 107, in run
>     else test_runner_api))
>   File "C:\projects\beam\sdks\python\apache_beam\pipeline.py", line 406, in 
> run
>     self._options).run(False)
>   File "C:\projects\beam\sdks\python\apache_beam\pipeline.py", line 419, in 
> run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\direct\direct_runner.py", 
> line 128, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\portability\fn_api_runner.py",
>  line 319, in run_pipeline
>     default_environment=self._default_environment))
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\portability\fn_api_runner.py",
>  line 326, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\portability\fn_api_runner.py",
>  line 408, in run_stages
>     stage_context.safe_coders)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\portability\fn_api_runner.py",
>  line 681, in _run_stage
>     result, splits = bundle_manager.process_bundle(data_input, data_output)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\portability\fn_api_runner.py",
>  line 1562, in process_bundle
>     part_inputs):
>   File "C:\venv\newenv1\lib\site-packages\concurrent\futures\_base.py", line 
> 641, in result_iterator
>     yield fs.pop().result()
>   File "C:\venv\newenv1\lib\site-packages\concurrent\futures\_base.py", line 
> 462, in result
>     return self.__get_result()
>   File "C:\venv\newenv1\lib\site-packages\concurrent\futures\thread.py", line 
> 63, in run
>     result = self.fn(*self.args, **self.kwargs)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\portability\fn_api_runner.py",
>  line 1561, in <lambda>
>     self._registered).process_bundle(part, expected_outputs),
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\portability\fn_api_runner.py",
>  line 1500, in process_bundle
>     result_future = self._controller.control_handler.push(process_bundle_req)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\portability\fn_api_runner.py",
>  line 1017, in push
>     response = self.worker.do_instruction(request)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\worker\sdk_worker.py", line 
> 342, in do_instruction
>     request.instruction_id)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\worker\sdk_worker.py", line 
> 368, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\worker\bundle_processor.py",
>  line 593, in process_bundle
>     data.ptransform_id].process_encoded(data.data)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\worker\bundle_processor.py",
>  line 143, in process_encoded
>     self.output(decoded_value)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\worker\operations.py", line 
> 256, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\worker\operations.py", line 
> 143, in receive
>     self.consumer.process(windowed_value)
>   File 
> "C:\projects\beam\sdks\python\apache_beam\runners\worker\operations.py", line 
> 594, in process
>     delayed_application = self.dofn_receiver.receive(o)
>   File "C:\projects\beam\sdks\python\apache_beam\runners\common.py", line 
> 795, in receive
>     self.process(windowed_value)
>   File "C:\projects\beam\sdks\python\apache_beam\runners\common.py", line 
> 801, in process
>     self._reraise_augmented(exn)
>   File "C:\projects\beam\sdks\python\apache_beam\runners\common.py", line 
> 868, in _reraise_augmented
>     raise_with_traceback(new_exn)
>   File "C:\projects\beam\sdks\python\apache_beam\runners\common.py", line 
> 799, in process
>     return self.do_fn_invoker.invoke_process(windowed_value)
>   File "C:\projects\beam\sdks\python\apache_beam\runners\common.py", line 
> 611, in invoke_process
>     windowed_value, additional_args, additional_kwargs, output_processor)
>   File "C:\projects\beam\sdks\python\apache_beam\runners\common.py", line 
> 683, in _invoke_process_per_window
>     windowed_value, self.process_method(*args_for_process))
>   File "C:\projects\beam\sdks\python\apache_beam\runners\common.py", line 
> 914, in process_outputs
>     for result in results:
>   File "C:\projects\beam\sdks\python\apache_beam\io\iobase.py", line 1085, in 
> <genexpr>
>     window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
>   File "C:\projects\beam\sdks\python\apache_beam\io\filebasedsink.py", line 
> 322, in finalize_write
>     'Encountered exceptions in finalize_write: %s' % all_exceptions)
> Exception: Encountered exceptions in finalize_write: 
> [IOError(WindowsError(32, 'The process cannot access the file becau
> se it is being used by another process'),)] [while running 
> 'WriteToAvro/Write/WriteImpl/FinalizeWrite']



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to