On Thu, Aug 10, 2017 at 8:29 AM, Reuven Lax <[email protected]> wrote:
> This is how the file sink has always worked in Beam. If no sharding is > specified, then this means runner-determined sharding, and by default that > is one file per bundle. If Flink has small bundles, then I suggest using > the withNumShards method to explicitly pick the number of output shards. > > The Flink runner can detect that runner-determined sharding has been > chosen, and override it with a specific number of shards. For example, the > Dataflow streaming runner (which as you mentioned also has small bundles) > detects this case and sets the number of out files shards based on the > number of workers in the worker pool Here > <https://github.com/apache/beam/blob/9e6530adb00669b7cf0f01cb8b128be0a21fd721/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L354> > is > the code that does this; it should be quite simple to do something similar > for Flink, and then there will be no need for users to explicitly call > withNumShards themselves. > > On Thu, Aug 10, 2017 at 3:09 AM, Aljoscha Krettek <[email protected]> > wrote: > >> Hi, >> >> I think Beam File Writing was recently changed to write one file per >> bundle. Currently, with the Flink Streaming Runner every element is >> considered to be one bundle, i.e. all bundles have size 1. This means that >> we write one file per output element. >> >> @Reuven, could you please confirm? Also, won't this be a problem for >> other Runners that can have very small bundle sizes? IIRC the Dataflow >> Runner also has rather small bundles in streaming mode, I'm not sure, >> though. >> >> Best, >> Aljoscha >> >> On 9. Aug 2017, at 19:14, Claire Yuan <[email protected]> wrote: >> >> Hi Aljoscha, >> I used the same sink as in the example TfIdf, and set the >> streamingOptions to be True, then I got one record per files. Here is the >> function to write my output. I called it in my pipeline at main method. >> public static class WriteOut extends PTransform<PCollection<KV<String, >> String>>, PDone> { >> private String output; >> public WriteOut(String output) { >> this.output = output; >> } >> @Override >> public PDone expand(PCollection<KV<String, String>> someInfos) { >> return outputInfos >> .apply("Format", ParDo.of(new DoFn<KV<String, String>, >> String>() { >> @ProcessElement >> public void processElement(ProcessContext c) { >> c.output(String.format("%s %s:\t%s", >> c.element().getKey().getName(), >> c.element().getKey().getNum(), >> c.element().getValue() >> )); >> } >> })) >> .apply(TextIO.write() >> .to(output) /* name of the output files */ >> .withSuffix(".csv")); >> } >> } >> >> Claire >> >> >> On Wednesday, August 9, 2017 2:21 AM, Aljoscha Krettek < >> [email protected]> wrote: >> >> >> Hi, >> >> I think Flink should not create one output file per record. Could you >> maybe post a snipped or minimal code example that shows how you're setting >> up your sinks? >> >> Best, >> Aljoscha >> >> On 8. Aug 2017, at 19:08, Claire Yuan <[email protected]> wrote: >> >> Hi all, >> I am currently running some jobs coded in Beam in streaming mode on >> Yarn session by Flink. My data sink was CSV files like the one in examples >> of TfIdf. And I noticed that the output format for Beam is to produce one >> file for every record, and also temp files for them. That would result in >> my space used exceed maximum. >> I am not sure whether is the problem that I used the API incorrectly >> but I am wondering if there any way I can put all those records into one >> file, or keep updating in that file, or delete those tempt files by >> windowing or triggering? >> >> Claire >> >> >> >> >> >> >
