On Mon, Mar 12, 2018 at 2:36 PM Romain Manni-Bucau <rmannibu...@gmail.com>

> Le 12 mars 2018 22:22, "Chamikara Jayalath" <chamik...@google.com> a
> écrit :
> On Mon, Mar 12, 2018 at 12:42 PM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>> Le 12 mars 2018 18:56, "Chamikara Jayalath" <chamik...@google.com> a
>> écrit :
>> Agree. We need file-system abstractions in all languages since (1) users
>> may need to directly access file-systems from DoFns (2) common file-based
>> sources/sinks will probably will be available in multiple languages even
>> with portability API and cross language IO (these are usually the first
>> sources/sinks that get implemented in an SDK and server as reference
>> implementations for other sources/sinks).
>> Can you detail 1? Shouldnt otherwise the abstraction is not that great.
>> Side note on this one: vfs uses static utilitiez to setup custom configs
>> for specific impl, beam can do the same and hides the unwrapping.
>> However Im a bit sad we always end up of the fs whereas the issue is
>> generic: how to bind some specific config for a transform?
>> Ex:
>> - parallelism by source in the direct runner and not globally
>> - different pool config for jdbc.read1 and jdbc.write1 cause concurrency
>> is different
>> - different jms provider for input1 and output2
>> - etc...
>> As soon as:
>> - the pipeline is "generic" (it takes a jms source/output or a file path)
>> Or
>> - an io uses a not exposed through the main api but configurable impl
>> Then you need a config per transform of the pipeline. Filesystem is not
>> different from the dozens of the cases so it would be great to fix it once
>> for all no?
>> Thinking a bit more, and without using hints, we can just use the name if
>> set in apply:
>> p.apply("myprefix", ...) would accept -Dmyprefix::myOption=foo for the
>> scope of the fn and if not set it would try to use myOption (inheritance
>> from the global context). This way we are simple, generic, and our config
>> becomes scalable and stays storable nicely.
> I think, at least for the state Beam is in currently, registered
> file-systems are global and available to all transforms. You basically have
> to invoke the static methods in FIleSystems (note the extra 's'), for
> example FileSystems.match() and correct FileSystem will be picked up from a
> globally registered set of FileSystems based on the prefix of the
> file-pattern used. So we do have some form of static utilities you
> mentioned. Use-case (1) is where a DoFn directly invokes one of the methods
> in FileSystems class, for example to read a PCollection of filenames.
> FIle-based transform contain other configs but these do not include
> configs for FileSystems (which is global).
> Agree but fail to see how it relates to the proposal? It is not blocker

Yeah it's not a blocker and per-transform overriding of options can be
useful. But not sure if that fixes the original problem in general for
FileSystems (supporting multiple S3 regions or HDFS clusters in the same
pipeline) since file-system registration is global and overriding options
per-transform will not necessarily allow us to customize the set of
FileSystems available per transform (unless I'm missing something in your

For example, we use options.getAwsRegion() once per pipeline when
registering the S3FileSystem not per-transform:

> - Cham
>> - Cham
>> On Mon, Mar 12, 2018 at 10:48 AM Lukasz Cwik <lc...@google.com> wrote:
>>> There is still a lot of work before we get to supporting cross language
>>> transforms and hence get access to filesystems written in different
>>> languages but how the options are passed through from one to the other will
>>> need to be well understood and it would be best if the way a user defines
>>> these filesystems is the same in all languages because it would be annoying
>>> to provide the same configuration (in slightly different ways) for Java,
>>> Python, Go, ...
>>> On Fri, Mar 9, 2018 at 2:01 PM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>> Le 9 mars 2018 21:35, "Lukasz Cwik" <lc...@google.com> a écrit :
>>>> The blocker is to get someone to follow through on the original design
>>>> or to get a new design (with feedback) and have it implemented.
>>>> If the pipelineoptionsfactory related pr are merged i can do a
>>>> pr/proposal bases on this thread draft this month.
>>>> Note that this impacts more than just Java as it also exists in Python
>>>> and Go as well.
>>>> Clearly outside my knowledge but since it is mainly java backed it
>>>> should be almost transparent no? If not should it be part of the portable
>>>> api on top of runners?
>>>> On Fri, Mar 9, 2018 at 12:18 PM, Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>> Hmm, it doesnt solve the issue that beam doesnt enable to configure
>>>>> transform from its "config" (let say the cli).
>>>>> So if i have a generic pipeline taking a file as input and another as
>>>>> output then i must register 2 filesystems in all cases? If the pipeline is
>>>>> dynamic i must make it dynamic too?
>>>>> Sounds pretty bad for end users and not generic - all transform hit
>>>>> this issue since beam cant assume the impl. Using a prefix (namespace 
>>>>> which
>>>>> can be implicit or not) is simple, straight forward and enables all cases
>>>>> to be handled smoothly for end users.
>>>>> What is the blocker to fix this design issue? I kind of fail to see
>>>>> why we end up on a few particular cases with workarounds right now :s.
>>>>> Le 9 mars 2018 19:00, "Jacob Marble" <jacobmar...@gmail.com> a écrit :
>>>>>> I think when I wrote the S3 code, I couldn't see how to set storage
>>>>>> class per-bucket, so put it in a flag. It's easy to imagine a use case
>>>>>> where storage class differs per filespec, not only per bucket.
>>>>>> Jacob
>>>>>> On Fri, Mar 9, 2018 at 9:51 AM, Jacob Marble <jacobmar...@gmail.com>
>>>>>> wrote:
>>>>>>> Yes, I agree with all of this.
>>>>>>> Jacob
>>>>>>> On Thu, Mar 8, 2018 at 9:52 PM, Robert Bradshaw <rober...@google.com
>>>>>>> > wrote:
>>>>>>>> On Thu, Mar 8, 2018 at 9:38 PM Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>> I think it may have been an API design mistake to put the S3
>>>>>>>>> region into PipelineOptions.
>>>>>>>> +1, IMHO it's generally a mistake to put any transform
>>>>>>>> configuration into PipelineOptions for exactly this reason.
>>>>>>>>> PipelineOptions are global per pipeline, whereas it's totally
>>>>>>>>> reasonable to access S3 files in different regions even from the code 
>>>>>>>>> of a
>>>>>>>>> single DoFn running on a single element. The same applies to
>>>>>>>>> "setS3StorageClass".
>>>>>>>>> Jacob: what do you think? Why is it necessary to specify the S3
>>>>>>>>> region at all - can AWS infer it automatically? Per
>>>>>>>>> https://github.com/aws/aws-sdk-java/issues/1107 it seems that
>>>>>>>>> this is possible via a setting on the client, so that the specified 
>>>>>>>>> region
>>>>>>>>> is used as the default but if the bucket is in a different region 
>>>>>>>>> things
>>>>>>>>> still work.
>>>>>>>>> As for the storage class: so far nobody complained ;) but it
>>>>>>>>> should probably be specified via
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/CreateOptions.java
>>>>>>>>>  instead
>>>>>>>>> of a pipeline option.
>>>>>>>>> On Thu, Mar 8, 2018 at 9:16 PM Romain Manni-Bucau <
>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>> The "hint" would probably to use hints :) - indees this joke
>>>>>>>>>> refers to the hint thread.
>>>>>>>>>> Long story short with hints you should be able to say "use that
>>>>>>>>>> specialize config here".
>>>>>>>>>> Now, personally, I'd like to see a way to specialize config per
>>>>>>>>>> transform. With an hint an easy way is to use a prefix: --s3-region 
>>>>>>>>>> would
>>>>>>>>>> become --prefix_transform1-s3-region. But to impl it i have
>>>>>>>>>> https://github.com/apache/beam/pull/4683 which needs to be
>>>>>>>>>> merged before ;).
>>>>>>>>>> Le 8 mars 2018 23:03, "Ismaël Mejía" <ieme...@gmail.com> a
>>>>>>>>>> écrit :
>>>>>>>>>>> I was trying to create a really simple pipeline that read from a
>>>>>>>>>>> bucket in a filesystem (s3) and writes to a different bucket in
>>>>>>>>>>> the
>>>>>>>>>>> same filesystem.
>>>>>>>>>>>     S3Options options =
>>>>>>>>>>> PipelineOptionsFactory.fromArgs(args).create().as(S3Options.class);
>>>>>>>>>>>     Pipeline pipeline = Pipeline.create(options);
>>>>>>>>>>>     pipeline
>>>>>>>>>>>       .apply("ReadLines",
>>>>>>>>>>> TextIO.read().from("s3://src-bucket/*"))
>>>>>>>>>>>       // .apply("AllOtherMagic", ...)
>>>>>>>>>>>       .apply("WriteCounts",
>>>>>>>>>>> TextIO.write().to("s3://dst-bucket/"));
>>>>>>>>>>>     p.run().waitUntilFinish();
>>>>>>>>>>> I discovered that my original bucket was in a different region
>>>>>>>>>>> so I
>>>>>>>>>>> needed to pass a different S3Options object to the Write
>>>>>>>>>>> ‘options.setAwsRegion(“dst-region”)’, but I could not find a way
>>>>>>>>>>> to do
>>>>>>>>>>> it. Can somebody give me a hint on how to do this?
>>>>>>>>>>> I was wondering that since File-based IOs use the configuration
>>>>>>>>>>> implied by the Filesystem if this was possible. With non-file
>>>>>>>>>>> based
>>>>>>>>>>> IOs all the configuration details are explicit in each specific
>>>>>>>>>>> transform, but this is not the case for these file-based
>>>>>>>>>>> transforms.
>>>>>>>>>>> Note. I know this question probably belongs more to user@ but
>>>>>>>>>>> since I
>>>>>>>>>>> couldn’t find an easy way to do it I was wondering if this is an
>>>>>>>>>>> issue
>>>>>>>>>>> we should consider at dev@ from an API point of view.

Reply via email to