[
https://issues.apache.org/jira/browse/BEAM-638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999903#comment-15999903
]
Jean-Baptiste Onofré commented on BEAM-638:
-------------------------------------------
Agree, it's resolved. This is what I tested:
{code}
.apply(TextIO.write()
.to("hdfs://localhost/uc2")
.withFilenamePolicy(new PerWindowFiles("uc2"))
.withWindowedWrites()
.withNumShards(1));
{code}
with:
{code}
public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {
private final String prefix;
public PerWindowFiles(String prefix) {
this.prefix = prefix;
}
public String filenamePrefixForWindow(IntervalWindow window) {
return String.format("%s-%s-%s",
prefix, FORMATTER.print(window.start()),
FORMATTER.print(window.end()));
}
@Override
public ResourceId windowedFilename(
ResourceId outputDirectory,
FileBasedSink.FilenamePolicy.WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
String filename = String.format(
"%s-%s-of-%s%s",
filenamePrefixForWindow(window), context.getShardNumber(),
context.getNumShards(),
extension);
return outputDirectory.resolve(filename,
ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Nullable
@Override
public ResourceId unwindowedFilename(ResourceId resourceId, Context
context, String s) {
throw new UnsupportedOperationException("Unsupported.");
}
}
{code}
> Add sink transform to write bounded data per window, pane, [and key] even
> when PCollection is unbounded
> -------------------------------------------------------------------------------------------------------
>
> Key: BEAM-638
> URL: https://issues.apache.org/jira/browse/BEAM-638
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-core
> Reporter: Jean-Baptiste Onofré
> Assignee: Davor Bonaci
> Fix For: 2.0.0
>
>
> Today, if the pipeline source is unbounded, and the sink expects a bounded
> collection, there's no way to use a single pipeline. Even a window creates a
> chunk on the unbounded PCollection, but the "sub" PCollection is still
> unbounded.
> It would be helpful for users to have a Window function that create a bounded
> PCollection (on the window) from an unbounded PCollection coming from the
> source.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)