Wouldn't you want to import filesystems to do validation of paths,
etc? (Also, registering an imported object is less error-prone than
registering a string.)

On Fri, Jul 7, 2017 at 9:29 PM, Chamikara Jayalath (JIRA)
<j...@apache.org> wrote:
>
>     [ 
> https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078949#comment-16078949
>  ]
>
> Chamikara Jayalath commented on BEAM-2573:
> ------------------------------------------
>
> Right. Usually you should not have to import FileSystem implementations 
> during pipeline construction and importing just for invoking the side affect 
> of being included in beam_plugin has downsides (lint errors in user program, 
> non-obvious api, relying on runner specific code). It might be better to have 
> a more direct API for registering FileSystems.
>
>> Better filesystem discovery mechanism in Python SDK
>> ---------------------------------------------------
>>
>>                 Key: BEAM-2573
>>                 URL: https://issues.apache.org/jira/browse/BEAM-2573
>>             Project: Beam
>>          Issue Type: Task
>>          Components: runner-dataflow, sdk-py
>>    Affects Versions: 2.0.0
>>            Reporter: Dmitry Demeshchuk
>>            Priority: Minor
>>
>> It looks like right now custom filesystem classes have to be imported 
>> explicitly: 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
>> Seems like the current implementation doesn't allow discovering filesystems 
>> that come from side packages, not from apache_beam itself. Even if I put a 
>> custom FileSystem-inheriting class into a package and explicitly import it 
>> in the root __init__.py of that package, it still doesn't make the class 
>> discoverable.
>> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
>> works just fine. Here's an example of Dataflow output:
>> {code}
>>   (320418708fe777d7): Traceback (most recent call last):
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", 
>> line 581, in do_work
>>     work_executor.execute()
>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
>> line 166, in execute
>>     op.start()
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>>  line 54, in start
>>     self.output(windowed_value)
>>   File "dataflow_worker/operations.py", line 138, in 
>> dataflow_worker.operations.Operation.output 
>> (dataflow_worker/operations.c:5768)
>>     def output(self, windowed_value, output_index=0):
>>   File "dataflow_worker/operations.py", line 139, in 
>> dataflow_worker.operations.Operation.output 
>> (dataflow_worker/operations.c:5654)
>>     cython.cast(Receiver, 
>> self.receivers[output_index]).receive(windowed_value)
>>   File "dataflow_worker/operations.py", line 72, in 
>> dataflow_worker.operations.ConsumerSet.receive 
>> (dataflow_worker/operations.c:3506)
>>     cython.cast(Operation, consumer).process(windowed_value)
>>   File "dataflow_worker/operations.py", line 328, in 
>> dataflow_worker.operations.DoOperation.process 
>> (dataflow_worker/operations.c:11162)
>>     with self.scoped_process_state:
>>   File "dataflow_worker/operations.py", line 329, in 
>> dataflow_worker.operations.DoOperation.process 
>> (dataflow_worker/operations.c:11116)
>>     self.dofn_receiver.receive(o)
>>   File "apache_beam/runners/common.py", line 382, in 
>> apache_beam.runners.common.DoFnRunner.receive 
>> (apache_beam/runners/common.c:10156)
>>     self.process(windowed_value)
>>   File "apache_beam/runners/common.py", line 390, in 
>> apache_beam.runners.common.DoFnRunner.process 
>> (apache_beam/runners/common.c:10458)
>>     self._reraise_augmented(exn)
>>   File "apache_beam/runners/common.py", line 431, in 
>> apache_beam.runners.common.DoFnRunner._reraise_augmented 
>> (apache_beam/runners/common.c:11673)
>>     raise new_exn, None, original_traceback
>>   File "apache_beam/runners/common.py", line 388, in 
>> apache_beam.runners.common.DoFnRunner.process 
>> (apache_beam/runners/common.c:10371)
>>     self.do_fn_invoker.invoke_process(windowed_value)
>>   File "apache_beam/runners/common.py", line 281, in 
>> apache_beam.runners.common.PerWindowInvoker.invoke_process 
>> (apache_beam/runners/common.c:8626)
>>     self._invoke_per_window(windowed_value)
>>   File "apache_beam/runners/common.py", line 307, in 
>> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
>> (apache_beam/runners/common.c:9302)
>>     windowed_value, self.process_method(*args_for_process))
>>   File 
>> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>>  line 749, in <lambda>
>>   File 
>> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>>  line 891, in <lambda>
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>>  line 109, in _f
>>     return fnc(self, *args, **kwargs)
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
>> line 146, in initialize_write
>>     tmp_dir = self._create_temp_dir(file_path_prefix)
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
>> line 151, in _create_temp_dir
>>     base_path, last_component = FileSystems.split(file_path_prefix)
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 
>> 99, in split
>>     filesystem = FileSystems.get_filesystem(path)
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 
>> 61, in get_filesystem
>>     raise ValueError('Unable to get the Filesystem for path %s' % path)
>> ValueError: Unable to get the Filesystem for path 
>> s3://my-test-bucket/test_output [while running 
>> 'write/Write/WriteImpl/InitializeWrite']
>> {code}
>> I apologize for not providing full context or codebase, because a lot of the 
>> code we are running is internal, and some of it is tightly coupled to our 
>> infrastructure. If I run out of experimenting options, I'll try to narrow my 
>> use case down to the simplest case possible (like, override a gcs filesystem 
>> with a different path prefix or something).
>> I think there are several possibilities here:
>> 1. I'm doing something wrong, and it should be trivial to achieve something 
>> like that. This probably implies figuring out the right approach and writing 
>> some guideline for the sources/sinks page in the docs.
>> 2. The current order of imports is not optimal, and we could possibly import 
>> the side packages before initializing the filesystem classes. I currently 
>> possess too little knowledge about the way things get executed in Dataflow, 
>> so it's hard for me to tell how much it's worth diving that rabbithole.
>> 3. There just needs to be a better way of referring to additional filesystem 
>> classes. One way of doing that is to just specify a class name explicitly 
>> inside the ReadFromText and WriteToText functions (or something along these 
>> lines). PipelineOptions seems like an overkill for this, but may still be an 
>> option. Finally, maybe there could be just a function that gets called in 
>> the main script that somehow tells Beam to discover a specific class?
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.4.14#64029)

Reply via email to