Wouldn't you want to import filesystems to do validation of paths, etc? (Also, registering an imported object is less error-prone than registering a string.)
On Fri, Jul 7, 2017 at 9:29 PM, Chamikara Jayalath (JIRA) <j...@apache.org> wrote: > > [ > https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078949#comment-16078949 > ] > > Chamikara Jayalath commented on BEAM-2573: > ------------------------------------------ > > Right. Usually you should not have to import FileSystem implementations > during pipeline construction and importing just for invoking the side affect > of being included in beam_plugin has downsides (lint errors in user program, > non-obvious api, relying on runner specific code). It might be better to have > a more direct API for registering FileSystems. > >> Better filesystem discovery mechanism in Python SDK >> --------------------------------------------------- >> >> Key: BEAM-2573 >> URL: https://issues.apache.org/jira/browse/BEAM-2573 >> Project: Beam >> Issue Type: Task >> Components: runner-dataflow, sdk-py >> Affects Versions: 2.0.0 >> Reporter: Dmitry Demeshchuk >> Priority: Minor >> >> It looks like right now custom filesystem classes have to be imported >> explicitly: >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 >> Seems like the current implementation doesn't allow discovering filesystems >> that come from side packages, not from apache_beam itself. Even if I put a >> custom FileSystem-inheriting class into a package and explicitly import it >> in the root __init__.py of that package, it still doesn't make the class >> discoverable. >> The problems I'm experiencing happen on Dataflow runner, while Direct runner >> works just fine. Here's an example of Dataflow output: >> {code} >> (320418708fe777d7): Traceback (most recent call last): >> File >> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", >> line 581, in do_work >> work_executor.execute() >> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", >> line 166, in execute >> op.start() >> File >> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", >> line 54, in start >> self.output(windowed_value) >> File "dataflow_worker/operations.py", line 138, in >> dataflow_worker.operations.Operation.output >> (dataflow_worker/operations.c:5768) >> def output(self, windowed_value, output_index=0): >> File "dataflow_worker/operations.py", line 139, in >> dataflow_worker.operations.Operation.output >> (dataflow_worker/operations.c:5654) >> cython.cast(Receiver, >> self.receivers[output_index]).receive(windowed_value) >> File "dataflow_worker/operations.py", line 72, in >> dataflow_worker.operations.ConsumerSet.receive >> (dataflow_worker/operations.c:3506) >> cython.cast(Operation, consumer).process(windowed_value) >> File "dataflow_worker/operations.py", line 328, in >> dataflow_worker.operations.DoOperation.process >> (dataflow_worker/operations.c:11162) >> with self.scoped_process_state: >> File "dataflow_worker/operations.py", line 329, in >> dataflow_worker.operations.DoOperation.process >> (dataflow_worker/operations.c:11116) >> self.dofn_receiver.receive(o) >> File "apache_beam/runners/common.py", line 382, in >> apache_beam.runners.common.DoFnRunner.receive >> (apache_beam/runners/common.c:10156) >> self.process(windowed_value) >> File "apache_beam/runners/common.py", line 390, in >> apache_beam.runners.common.DoFnRunner.process >> (apache_beam/runners/common.c:10458) >> self._reraise_augmented(exn) >> File "apache_beam/runners/common.py", line 431, in >> apache_beam.runners.common.DoFnRunner._reraise_augmented >> (apache_beam/runners/common.c:11673) >> raise new_exn, None, original_traceback >> File "apache_beam/runners/common.py", line 388, in >> apache_beam.runners.common.DoFnRunner.process >> (apache_beam/runners/common.c:10371) >> self.do_fn_invoker.invoke_process(windowed_value) >> File "apache_beam/runners/common.py", line 281, in >> apache_beam.runners.common.PerWindowInvoker.invoke_process >> (apache_beam/runners/common.c:8626) >> self._invoke_per_window(windowed_value) >> File "apache_beam/runners/common.py", line 307, in >> apache_beam.runners.common.PerWindowInvoker._invoke_per_window >> (apache_beam/runners/common.c:9302) >> windowed_value, self.process_method(*args_for_process)) >> File >> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", >> line 749, in <lambda> >> File >> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", >> line 891, in <lambda> >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", >> line 109, in _f >> return fnc(self, *args, **kwargs) >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", >> line 146, in initialize_write >> tmp_dir = self._create_temp_dir(file_path_prefix) >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", >> line 151, in _create_temp_dir >> base_path, last_component = FileSystems.split(file_path_prefix) >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line >> 99, in split >> filesystem = FileSystems.get_filesystem(path) >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line >> 61, in get_filesystem >> raise ValueError('Unable to get the Filesystem for path %s' % path) >> ValueError: Unable to get the Filesystem for path >> s3://my-test-bucket/test_output [while running >> 'write/Write/WriteImpl/InitializeWrite'] >> {code} >> I apologize for not providing full context or codebase, because a lot of the >> code we are running is internal, and some of it is tightly coupled to our >> infrastructure. If I run out of experimenting options, I'll try to narrow my >> use case down to the simplest case possible (like, override a gcs filesystem >> with a different path prefix or something). >> I think there are several possibilities here: >> 1. I'm doing something wrong, and it should be trivial to achieve something >> like that. This probably implies figuring out the right approach and writing >> some guideline for the sources/sinks page in the docs. >> 2. The current order of imports is not optimal, and we could possibly import >> the side packages before initializing the filesystem classes. I currently >> possess too little knowledge about the way things get executed in Dataflow, >> so it's hard for me to tell how much it's worth diving that rabbithole. >> 3. There just needs to be a better way of referring to additional filesystem >> classes. One way of doing that is to just specify a class name explicitly >> inside the ReadFromText and WriteToText functions (or something along these >> lines). PipelineOptions seems like an overkill for this, but may still be an >> option. Finally, maybe there could be just a function that gets called in >> the main script that somehow tells Beam to discover a specific class? > > > > -- > This message was sent by Atlassian JIRA > (v6.4.14#64029)