Dmitry Demeshchuk created BEAM-2573:
---------------------------------------
Summary: Better filesystem discovery mechanism in Python SDK
Key: BEAM-2573
URL: https://issues.apache.org/jira/browse/BEAM-2573
Project: Beam
Issue Type: Task
Components: runner-dataflow, sdk-py
Reporter: Dmitry Demeshchuk
Assignee: Thomas Groh
Priority: Minor
It looks like right now custom filesystem classes have to be imported
explicitly:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
Seems like the current implementation doesn't allow discovering filesystems
that come from side packages, not from apache_beam itself. Even if I put a
custom FileSystem-inheriting class into a package and explicitly import it in
the root __init__.py of that package, it still doesn't make the class
discoverable.
The problems I'm experiencing happen on Dataflow runner, while Direct runner
works just fine. Here's an example of Dataflow output:
{code}
(320418708fe777d7): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 581, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 166, in execute
op.start()
File
"/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
line 54, in start
self.output(windowed_value)
File "dataflow_worker/operations.py", line 138, in
dataflow_worker.operations.Operation.output (dataflow_worker/operations.c:5768)
def output(self, windowed_value, output_index=0):
File "dataflow_worker/operations.py", line 139, in
dataflow_worker.operations.Operation.output (dataflow_worker/operations.c:5654)
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "dataflow_worker/operations.py", line 72, in
dataflow_worker.operations.ConsumerSet.receive
(dataflow_worker/operations.c:3506)
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/operations.py", line 328, in
dataflow_worker.operations.DoOperation.process
(dataflow_worker/operations.c:11162)
with self.scoped_process_state:
File "dataflow_worker/operations.py", line 329, in
dataflow_worker.operations.DoOperation.process
(dataflow_worker/operations.c:11116)
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in
apache_beam.runners.common.DoFnRunner.receive
(apache_beam/runners/common.c:10156)
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in
apache_beam.runners.common.DoFnRunner.process
(apache_beam/runners/common.c:10458)
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 431, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
(apache_beam/runners/common.c:11673)
raise new_exn, None, original_traceback
File "apache_beam/runners/common.py", line 388, in
apache_beam.runners.common.DoFnRunner.process
(apache_beam/runners/common.c:10371)
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 281, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
(apache_beam/runners/common.c:8626)
self._invoke_per_window(windowed_value)
File "apache_beam/runners/common.py", line 307, in
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
(apache_beam/runners/common.c:9302)
windowed_value, self.process_method(*args_for_process))
File
"/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
line 749, in <lambda>
File
"/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
line 891, in <lambda>
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
line 109, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", line
146, in initialize_write
tmp_dir = self._create_temp_dir(file_path_prefix)
File
"/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", line
151, in _create_temp_dir
base_path, last_component = FileSystems.split(file_path_prefix)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
line 99, in split
filesystem = FileSystems.get_filesystem(path)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
line 61, in get_filesystem
raise ValueError('Unable to get the Filesystem for path %s' % path)
ValueError: Unable to get the Filesystem for path
s3://my-test-bucket/test_output [while running
'write/Write/WriteImpl/InitializeWrite']
{code}
I apologize for not providing full context or codebase, because a lot of the
code we are running is internal, and some of it is tightly coupled to our
infrastructure. If I run out of experimenting options, I'll try to narrow my
use case down to the simplest case possible (like, override a gcs filesystem
with a different path prefix or something).
I think there are several possibilities here:
1. I'm doing something wrong, and it should be trivial to achieve something
like that. This probably implies figuring out the right approach and writing
some guideline for the sources/sinks page in the docs.
2. The current order of imports is not optimal, and we could possibly import
the side packages before initializing the filesystem classes. I currently
possess too little knowledge about the way things get executed in Dataflow, so
it's hard for me to tell how much it's worth diving that rabbithole.
3. There just needs to be a better way of referring to additional filesystem
classes. One way of doing that is to just specify a class name explicitly
inside the ReadFromText and WriteToText functions (or something along these
lines). PipelineOptions seems like an overkill for this, but may still be an
option. Finally, maybe there could be just a function that gets called in the
main script that somehow tells Beam to discover a specific class?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)