[
https://issues.apache.org/jira/browse/BEAM-9316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040149#comment-17040149
]
Claire McGinty commented on BEAM-9316:
--------------------------------------
thanks for taking a look [~jkff] and [~iemejia] ! my issue with it is that it
actually has a bug resolving the relative filename when it's nested like this.
For example, I ran this snippet on DirectRunner and DataflowRunner (sorry, more
Scala code...):
{code:java}
val pipeline: Pipeline = ...
pipeline.apply(
Create.of(ImmutableList.of("a1", "b1"))
).apply(
FileIO.writeDynamic()
.by((element: String) => element.charAt(0).toString)
.to("gs://some_bucket/write_dynamic")
.via(TextIO.sink())
.withNaming((dst: String) =>
FileIO.Write.relativeFileNaming(
StaticValueProvider.of("nested/directory"),
new FileNaming {
override def getFilename(
window: BoundedWindow,
pane: PaneInfo,
numShards: Int,
shardIndex: Int,
compression: Compression
): String = s"file_${shardIndex}_${dst}.txt"
}
))
.withDestinationCoder(StringUtf8Coder.of())
)
{code}
In DataflowRunner, files are written to
gs://some_bucket/write_dynamic//nested/directory/file_0_b.txt and
gs://some_bucket/write_dynamic//nested/directory/file_0_a.txt (note the extra
forward slash–I think the filesystems API is prepending it to nested/directory
when it matches the resource).
In DirectRunner, files are written to
gs://some_bucket/write_dynamic//Users/clairemcginty/nested/directory/file_0_a.txt
and
gs://some_bucket/write_dynamic//Users/clairemcginty/nested/directory/file_0_b.txt.
In this case it resolves the absolute path of "nested/directory" on my local
FS and appends that to the gs://some_bucket/write_dynamic, again with the
double forward slash.
It seems that the double forward slash is getting prepended when
FileIO.Write.relativeFileNaming resolves the filename with the given
baseDirectory (in this case "nested/directory" becomes "/nested/directory").
(If I get rid of .to() and just use FileIO.Write.relativeFileNaming(
StaticValueProvider.of("gs://some_bucket/write_dynamic/nested/directory"), it
works as expected.)
I think this is actually a difficult thing to fix since
FileSystems.matchNewResource tries to parse the path scheme, so passing just a
directory name doesn't work correctly. It seems more straightforward to just
make the API easier to use correctly by renaming/deprecating things as you
suggested.
> FileIO.Write.relativeFileNaming should not be public
> ----------------------------------------------------
>
> Key: BEAM-9316
> URL: https://issues.apache.org/jira/browse/BEAM-9316
> Project: Beam
> Issue Type: Improvement
> Components: io-java-files
> Reporter: Claire McGinty
> Priority: Major
>
> I think the existing FileIO.writeDynamic is a bit easy to misuse, as
> something like this looks correct, and compiles:
>
> {{ FileIO.writeDynamic()}}
> {{ .by(...)}}
> {{ .withNaming(new SerializableFunction[String, FileNaming] {}}
> {{ override def apply(str: String): FileNaming =}}
> {{ FileIO.Write.relativeFileNaming(}}
> {{ "some/directory",}}
> {{ new FileNaming {}}
> {{ override defFilename(window: BoundedWindow, pane: PaneInfo,
> numShards: Int, shardIndex: Int, compression: Compression): String =
> "some_filename.txt"}}{{}}}
> {{ .via(...)}}
> {{ .to("gs://some/bucket")}}
>
> However, for dynamic writes, if `outputDirectory` (.to("...")) is set, under
> the hood, Beam will wrap the provided `fileNamingFn` in
> `FileIO.Write.relativeFileNaming(...)` as well, so it ends up as a nested
> `relativeFileNaming` function.
> ([https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1243)|https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1243])]
>
> IMO, `relativeFileNaming` should either be made private, so that it's only
> used internally by FileIO.Write, or a precondition should be added when a
> dynamic FileIO.Write is expanded, to check that `outputDirectory` can't be
> set if the provided `fileNamingFn` is relative.
>
> wdyt?
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)