Btw, I created an issue for this: https://issues.apache.org/jira/browse/BEAM-2873 <https://issues.apache.org/jira/browse/BEAM-2873>
> On 10. Aug 2017, at 17:40, Reuven Lax <[email protected]> wrote: > > > > On Thu, Aug 10, 2017 at 8:29 AM, Reuven Lax <[email protected] > <mailto:[email protected]>> wrote: > This is how the file sink has always worked in Beam. If no sharding is > specified, then this means runner-determined sharding, and by default that is > one file per bundle. If Flink has small bundles, then I suggest using the > withNumShards method to explicitly pick the number of output shards. > > The Flink runner can detect that runner-determined sharding has been chosen, > and override it with a specific number of shards. For example, the Dataflow > streaming runner (which as you mentioned also has small bundles) detects this > case and sets the number of out files shards based on the number of workers > in the worker pool Here > <https://github.com/apache/beam/blob/9e6530adb00669b7cf0f01cb8b128be0a21fd721/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L354> > is the code that does this; it should be quite simple to do something > similar for Flink, and then there will be no need for users to explicitly > call withNumShards themselves. > > On Thu, Aug 10, 2017 at 3:09 AM, Aljoscha Krettek <[email protected] > <mailto:[email protected]>> wrote: > Hi, > > I think Beam File Writing was recently changed to write one file per bundle. > Currently, with the Flink Streaming Runner every element is considered to be > one bundle, i.e. all bundles have size 1. This means that we write one file > per output element. > > @Reuven, could you please confirm? Also, won't this be a problem for other > Runners that can have very small bundle sizes? IIRC the Dataflow Runner also > has rather small bundles in streaming mode, I'm not sure, though. > > Best, > Aljoscha > >> On 9. Aug 2017, at 19:14, Claire Yuan <[email protected] >> <mailto:[email protected]>> wrote: >> >> Hi Aljoscha, >> I used the same sink as in the example TfIdf, and set the streamingOptions >> to be True, then I got one record per files. Here is the function to write >> my output. I called it in my pipeline at main method. >> public static class WriteOut extends PTransform<PCollection<KV<String, >> String>>, PDone> { >> private String output; >> public WriteOut(String output) { >> this.output = output; >> } >> @Override >> public PDone expand(PCollection<KV<String, String>> someInfos) { >> return outputInfos >> .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() { >> @ProcessElement >> public void processElement(ProcessContext c) { >> c.output(String.format("%s %s:\t%s", >> c.element().getKey().getName(), >> c.element().getKey().getNum(), >> c.element().getValue() >> )); >> } >> })) >> .apply(TextIO.write() >> .to(output) /* name of the output files */ >> .withSuffix(".csv")); >> } >> } >> >> Claire >> >> >> On Wednesday, August 9, 2017 2:21 AM, Aljoscha Krettek <[email protected] >> <mailto:[email protected]>> wrote: >> >> >> Hi, >> >> I think Flink should not create one output file per record. Could you maybe >> post a snipped or minimal code example that shows how you're setting up your >> sinks? >> >> Best, >> Aljoscha >>> On 8. Aug 2017, at 19:08, Claire Yuan <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Hi all, >>> I am currently running some jobs coded in Beam in streaming mode on Yarn >>> session by Flink. My data sink was CSV files like the one in examples of >>> TfIdf. And I noticed that the output format for Beam is to produce one file >>> for every record, and also temp files for them. That would result in my >>> space used exceed maximum. >>> I am not sure whether is the problem that I used the API incorrectly but >>> I am wondering if there any way I can put all those records into one file, >>> or keep updating in that file, or delete those tempt files by windowing or >>> triggering? >>> >>> Claire >> >> >> > > >
