[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078889#comment-16078889
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-----------------------------------------

Here's a quick example of the file being picked up locally:

{code}
In [1]: import dataflow
No handlers could be found for logger "oauth2client.contrib.multistore_file"

In [2]: from apache_beam.io.filesystem import FileSystem

In [3]: FileSystem.get_all_subclasses()
Out[3]:
[apache_beam.io.localfilesystem.LocalFileSystem,
 apache_beam.io.gcp.gcsfilesystem.GCSFileSystem,
 dataflow.aws.s3.S3FileSystem]
{code}

dataflow is our internal Python package that we use for more robust interaction 
with Dataflow and custom sources/sinks/transforms. It definitely gets loaded at 
the end, because other pipelines that use it work just fine, and it's only the 
filesystems that can't pick up its classes.

> Better filesystem discovery mechanism in Python SDK
> ---------------------------------------------------
>
>                 Key: BEAM-2573
>                 URL: https://issues.apache.org/jira/browse/BEAM-2573
>             Project: Beam
>          Issue Type: Task
>          Components: runner-dataflow, sdk-py
>            Reporter: Dmitry Demeshchuk
>            Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
>     op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
>     self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
>     def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
>     cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
>     with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11116)
>     self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
>     self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
>     self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
>     raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
>     self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
>     self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
>     windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in <lambda>
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in <lambda>
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
>     return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
>     tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
>     base_path, last_component = FileSystems.split(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 
> 99, in split
>     filesystem = FileSystems.get_filesystem(path)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 
> 61, in get_filesystem
>     raise ValueError('Unable to get the Filesystem for path %s' % path)
> ValueError: Unable to get the Filesystem for path 
> s3://my-test-bucket/test_output [while running 
> 'write/Write/WriteImpl/InitializeWrite']
> {code}
> I apologize for not providing full context or codebase, because a lot of the 
> code we are running is internal, and some of it is tightly coupled to our 
> infrastructure. If I run out of experimenting options, I'll try to narrow my 
> use case down to the simplest case possible (like, override a gcs filesystem 
> with a different path prefix or something).
> I think there are several possibilities here:
> 1. I'm doing something wrong, and it should be trivial to achieve something 
> like that. This probably implies figuring out the right approach and writing 
> some guideline for the sources/sinks page in the docs.
> 2. The current order of imports is not optimal, and we could possibly import 
> the side packages before initializing the filesystem classes. I currently 
> possess too little knowledge about the way things get executed in Dataflow, 
> so it's hard for me to tell how much it's worth diving that rabbithole.
> 3. There just needs to be a better way of referring to additional filesystem 
> classes. One way of doing that is to just specify a class name explicitly 
> inside the ReadFromText and WriteToText functions (or something along these 
> lines). PipelineOptions seems like an overkill for this, but may still be an 
> option. Finally, maybe there could be just a function that gets called in the 
> main script that somehow tells Beam to discover a specific class?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to