[
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenneth Knowles updated BEAM-2573:
----------------------------------
This Jira ticket has a pull request attached to it, but is still open. Did the
pull request resolve the issue? If so, could you please mark it resolved? This
will help the project have a clear view of its open issues.
> Better filesystem discovery mechanism in Python SDK
> ---------------------------------------------------
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
> Issue Type: Task
> Components: runner-dataflow, sdk-py-core
> Affects Versions: 2.0.0
> Reporter: Dmitry Demeshchuk
> Priority: P3
> Time Spent: 40m
> Remaining Estimate: 0h
>
> It looks like right now custom filesystem classes have to be imported
> explicitly:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems
> that come from side packages, not from apache_beam itself. Even if I put a
> custom FileSystem-inheriting class into a package and explicitly import it in
> the root __init__.py of that package, it still doesn't make the class
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner
> works just fine. Here's an example of Dataflow output:
> {code}
> (320418708fe777d7): Traceback (most recent call last):
> File
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line
> 581, in do_work
> work_executor.execute()
> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
> line 166, in execute
> op.start()
> File
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
> line 54, in start
> self.output(windowed_value)
> File "dataflow_worker/operations.py", line 138, in
> dataflow_worker.operations.Operation.output
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
> File "dataflow_worker/operations.py", line 139, in
> dataflow_worker.operations.Operation.output
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver,
> self.receivers[output_index]).receive(windowed_value)
> File "dataflow_worker/operations.py", line 72, in
> dataflow_worker.operations.ConsumerSet.receive
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
> File "dataflow_worker/operations.py", line 328, in
> dataflow_worker.operations.DoOperation.process
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
> File "dataflow_worker/operations.py", line 329, in
> dataflow_worker.operations.DoOperation.process
> (dataflow_worker/operations.c:11116)
> self.dofn_receiver.receive(o)
> File "apache_beam/runners/common.py", line 382, in
> apache_beam.runners.common.DoFnRunner.receive
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
> File "apache_beam/runners/common.py", line 390, in
> apache_beam.runners.common.DoFnRunner.process
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
> File "apache_beam/runners/common.py", line 431, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
> File "apache_beam/runners/common.py", line 388, in
> apache_beam.runners.common.DoFnRunner.process
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
> File "apache_beam/runners/common.py", line 281, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
> File "apache_beam/runners/common.py", line 307, in
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
> File
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
> line 749, in <lambda>
> File
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
> line 891, in <lambda>
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
> line 109, in _f
> return fnc(self, *args, **kwargs)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py",
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py",
> line 151, in _create_temp_dir
> base_path, last_component = FileSystems.split(file_path_prefix)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line
> 99, in split
> filesystem = FileSystems.get_filesystem(path)
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line
> 61, in get_filesystem
> raise ValueError('Unable to get the Filesystem for path %s' % path)
> ValueError: Unable to get the Filesystem for path
> s3://my-test-bucket/test_output [while running
> 'write/Write/WriteImpl/InitializeWrite']
> {code}
> I apologize for not providing full context or codebase, because a lot of the
> code we are running is internal, and some of it is tightly coupled to our
> infrastructure. If I run out of experimenting options, I'll try to narrow my
> use case down to the simplest case possible (like, override a gcs filesystem
> with a different path prefix or something).
> I think there are several possibilities here:
> 1. I'm doing something wrong, and it should be trivial to achieve something
> like that. This probably implies figuring out the right approach and writing
> some guideline for the sources/sinks page in the docs.
> 2. The current order of imports is not optimal, and we could possibly import
> the side packages before initializing the filesystem classes. I currently
> possess too little knowledge about the way things get executed in Dataflow,
> so it's hard for me to tell how much it's worth diving that rabbithole.
> 3. There just needs to be a better way of referring to additional filesystem
> classes. One way of doing that is to just specify a class name explicitly
> inside the ReadFromText and WriteToText functions (or something along these
> lines). PipelineOptions seems like an overkill for this, but may still be an
> option. Finally, maybe there could be just a function that gets called in the
> main script that somehow tells Beam to discover a specific class?
--
This message was sent by Atlassian Jira
(v8.20.1#820001)