[ 
https://issues.apache.org/jira/browse/BEAM-9316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040149#comment-17040149
 ] 

Claire McGinty commented on BEAM-9316:
--------------------------------------

thanks for taking a look [~jkff] and [~iemejia] ! my issue with it is that it 
actually has a bug resolving the relative filename when it's nested like this.

For example, I ran this snippet on DirectRunner and DataflowRunner (sorry, more 
Scala code...):

 
{code:java}
val pipeline: Pipeline = ...

pipeline.apply(
  Create.of(ImmutableList.of("a1", "b1"))
).apply(
  FileIO.writeDynamic()
    .by((element: String) => element.charAt(0).toString)
    .to("gs://some_bucket/write_dynamic")
    .via(TextIO.sink())
    .withNaming((dst: String) =>
      FileIO.Write.relativeFileNaming(
        StaticValueProvider.of("nested/directory"),
        new FileNaming {
          override def getFilename(
            window: BoundedWindow,
            pane: PaneInfo,
            numShards: Int,
            shardIndex: Int,
            compression: Compression
          ): String = s"file_${shardIndex}_${dst}.txt"
        }
      ))
    .withDestinationCoder(StringUtf8Coder.of())
)
{code}
 

 

In DataflowRunner, files are written to 
gs://some_bucket/write_dynamic//nested/directory/file_0_b.txt and 
gs://some_bucket/write_dynamic//nested/directory/file_0_a.txt (note the extra 
forward slash–I think the filesystems API is prepending it to nested/directory 
when it matches the resource).

In DirectRunner, files are written to 
gs://some_bucket/write_dynamic//Users/clairemcginty/nested/directory/file_0_a.txt
 and 
gs://some_bucket/write_dynamic//Users/clairemcginty/nested/directory/file_0_b.txt.
 In this case it resolves the absolute path of "nested/directory" on my local 
FS and appends that to the gs://some_bucket/write_dynamic, again with the 
double forward slash.

It seems that the double forward slash is getting prepended when 
FileIO.Write.relativeFileNaming resolves the filename with the given 
baseDirectory (in this case "nested/directory" becomes "/nested/directory"). 
(If I get rid of .to() and just use FileIO.Write.relativeFileNaming( 
StaticValueProvider.of("gs://some_bucket/write_dynamic/nested/directory"), it 
works as expected.)

I think this is actually a difficult thing to fix since 
FileSystems.matchNewResource tries to parse the path scheme, so passing just a 
directory name doesn't work correctly. It seems more straightforward to just 
make the API easier to use correctly by renaming/deprecating things as you 
suggested.

 

> FileIO.Write.relativeFileNaming should not be public
> ----------------------------------------------------
>
>                 Key: BEAM-9316
>                 URL: https://issues.apache.org/jira/browse/BEAM-9316
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files
>            Reporter: Claire McGinty
>            Priority: Major
>
> I think the existing FileIO.writeDynamic is a bit easy to misuse, as 
> something like this looks correct, and compiles:
>  
> {{ FileIO.writeDynamic()}}
> {{  .by(...)}}
> {{  .withNaming(new SerializableFunction[String, FileNaming] {}}
> {{     override def apply(str: String): FileNaming =}}
> {{       FileIO.Write.relativeFileNaming(}}
> {{         "some/directory",}}
> {{         new FileNaming {}}
> {{           override defFilename(window: BoundedWindow, pane: PaneInfo, 
> numShards: Int, shardIndex: Int, compression: Compression): String = 
> "some_filename.txt"}}{{}}}
> {{  .via(...)}}
> {{  .to("gs://some/bucket")}}
>  
> However, for dynamic writes, if `outputDirectory` (.to("...")) is set, under 
> the hood, Beam will wrap the provided `fileNamingFn` in 
> `FileIO.Write.relativeFileNaming(...)` as well, so it ends up as a nested 
> `relativeFileNaming` function. 
> ([https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1243)|https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1243])]
>  
> IMO, `relativeFileNaming` should either be made private, so that it's only 
> used internally by FileIO.Write, or a precondition should be added when a 
> dynamic FileIO.Write is expanded, to check that `outputDirectory` can't be 
> set if the provided `fileNamingFn` is relative.
>  
> wdyt?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to