[ 
https://issues.apache.org/jira/browse/BEAM-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17546616#comment-17546616
 ] 

Kenneth Knowles commented on BEAM-2448:
---------------------------------------

This issue has been migrated to https://github.com/apache/beam/issues/18463

> FileBasedSink writing to incorrect path when path prefixe has no file 
> component in path (e.g. /tmp/)
> ----------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-2448
>                 URL: https://issues.apache.org/jira/browse/BEAM-2448
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Luke Cwik
>            Priority: P3
>              Labels: Clarified
>
> This was reported by a user on dev@, original report:
> https://lists.apache.org/thread.html/378da40ca7d13e226ca793d2a27af047f9a562f273f6eaa5d677dc4b@%3Cdev.beam.apache.org%3E
> This pipeline (where WindowedFilenamePolicy is the one found in 
> org.apache.beam.sdk.io.AvroIOTest) produces files in the wrong directory:
> {code}
> stringsPCollection.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
>         .apply("WritingToOutput",
> TextIO.write().withWindowedWrites().withFilenamePolicy(new
> WindowedFilenamePolicy("my_pref")).to("/tmp/").withNumShards(1));
> {code}
> I expected the output files to be written to /tmp/ directory but they are 
> not. They are written to root directory which is unexpected behavior.
> I think the problem is org.apache.beam.sdk.io.FileBasedSink.ExtractDirectory 
> ...
> This main method shows the problem:
> {code}
> import org.apache.beam.sdk.io.FileBasedSink;
> import org.apache.beam.sdk.io.fs.ResourceId;
> import org.apache.beam.sdk.options.ValueProvider;
> import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
> public class Test {
>   public static void main(String[] args) {
>     ResourceId res = FileBasedSink.convertToFileResourceIfPossible("/tmp/");
>     System.out.println("Resource is " + res + ", current directory is " +
> res.getCurrentDirectory() + ", filename is " + res.getFilename());
>     FileBasedSink<String> mockFBS = new
> FileBasedSink<String>(StaticValueProvider.of(res), null) {
>       @Override
>       public org.apache.beam.sdk.io.FileBasedSink.WriteOperation<String>
> createWriteOperation() {
>         return null;
>       }
>     };
>     final ValueProvider<ResourceId> provider =
> mockFBS.getBaseOutputDirectoryProvider();
>     System.out.println("BaseOutputProvider is " + provider + ",
> isAccessible=" + provider.isAccessible() + ", getValue=" + provider.get());
>   }
> }
> {code}
> The output is
> {code}
> Resource is /tmp, current directory is //, filename is tmp
> BaseOutputProvider is
> NestedValueProvider{value=StaticValueProvider{value=/tmp}},
> isAccessible=true, getValue=//
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to