[ 
https://issues.apache.org/jira/browse/BEAM-638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999903#comment-15999903
 ] 

Jean-Baptiste Onofré commented on BEAM-638:
-------------------------------------------

Agree, it's resolved. This is what I tested:

{code}
                .apply(TextIO.write()
                .to("hdfs://localhost/uc2")
                .withFilenamePolicy(new PerWindowFiles("uc2"))
                .withWindowedWrites()
                .withNumShards(1));
{code}

with: 

{code}
    public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {

        private final String prefix;

        public PerWindowFiles(String prefix) {
            this.prefix = prefix;
        }

        public String filenamePrefixForWindow(IntervalWindow window) {
            return String.format("%s-%s-%s",
                    prefix, FORMATTER.print(window.start()), 
FORMATTER.print(window.end()));
        }

        @Override
        public ResourceId windowedFilename(
                ResourceId outputDirectory, 
FileBasedSink.FilenamePolicy.WindowedContext context, String extension) {
            IntervalWindow window = (IntervalWindow) context.getWindow();
            String filename = String.format(
                    "%s-%s-of-%s%s",
                    filenamePrefixForWindow(window), context.getShardNumber(), 
context.getNumShards(),
                    extension);
            return outputDirectory.resolve(filename, 
ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        }

        @Nullable
        @Override
        public ResourceId unwindowedFilename(ResourceId resourceId, Context 
context, String s) {
            throw new UnsupportedOperationException("Unsupported.");
        }
    }
{code}

> Add sink transform to write bounded data per window, pane, [and key] even 
> when PCollection is unbounded
> -------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-638
>                 URL: https://issues.apache.org/jira/browse/BEAM-638
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Davor Bonaci
>             Fix For: 2.0.0
>
>
> Today, if the pipeline source is unbounded, and the sink expects a bounded 
> collection, there's no way to use a single pipeline. Even a window creates a 
> chunk on the unbounded PCollection, but the "sub" PCollection is still 
> unbounded.
> It would be helpful for users to have a Window function that create a bounded 
> PCollection (on the window) from an unbounded PCollection coming from the 
> source.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to