On Thu, Aug 10, 2017 at 8:29 AM, Reuven Lax <re...@google.com> wrote:

> This is how the file sink has always worked in Beam. If no sharding is
> specified, then this means runner-determined sharding, and by default that
> is one file per bundle. If Flink has small bundles, then I suggest using
> the withNumShards method to explicitly pick the number of output shards.
>
> The Flink runner can detect that runner-determined sharding has been
> chosen, and override it with a specific number of shards. For example, the
> Dataflow streaming runner (which as you mentioned also has small bundles)
> detects this case and sets the number of out files shards based on the
> number of workers in the worker pool Here
> <https://github.com/apache/beam/blob/9e6530adb00669b7cf0f01cb8b128be0a21fd721/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L354>
>  is
> the code that does this; it should be quite simple to do something similar
> for Flink, and then there will be no need for users to explicitly call
> withNumShards themselves.
>
> On Thu, Aug 10, 2017 at 3:09 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>>
>> I think Beam File Writing was recently changed to write one file per
>> bundle. Currently, with the Flink Streaming Runner every element is
>> considered to be one bundle, i.e. all bundles have size 1. This means that
>> we write one file per output element.
>>
>> @Reuven, could you please confirm? Also, won't this be a problem for
>> other Runners that can have very small bundle sizes? IIRC the Dataflow
>> Runner also has rather small bundles in streaming mode, I'm not sure,
>> though.
>>
>> Best,
>> Aljoscha
>>
>> On 9. Aug 2017, at 19:14, Claire Yuan <clairey...@yahoo-inc.com> wrote:
>>
>> Hi Aljoscha,
>>   I used the same sink as in the example TfIdf, and set the
>> streamingOptions to be True, then I got one record per files. Here is the
>> function to write my output. I called it in my pipeline at main method.
>>   public static class WriteOut extends PTransform<PCollection<KV<String,
>> String>>, PDone> {
>>     private String output;
>>     public WriteOut(String output) {
>>       this.output = output;
>>     }
>>     @Override
>>     public PDone expand(PCollection<KV<String, String>> someInfos) {
>>       return outputInfos
>>           .apply("Format", ParDo.of(new DoFn<KV<String, String>,
>> String>() {
>>             @ProcessElement
>>             public void processElement(ProcessContext c) {
>>               c.output(String.format("%s %s:\t%s",
>>                   c.element().getKey().getName(),
>>                   c.element().getKey().getNum(),
>>                   c.element().getValue()
>>                   ));
>>             }
>>           }))
>>           .apply(TextIO.write()
>>               .to(output) /* name of the output files */
>>               .withSuffix(".csv"));
>>     }
>>   }
>>
>> Claire
>>
>>
>> On Wednesday, August 9, 2017 2:21 AM, Aljoscha Krettek <
>> aljos...@apache.org> wrote:
>>
>>
>> Hi,
>>
>> I think Flink should not create one output file per record. Could you
>> maybe post a snipped or minimal code example that shows how you're setting
>> up your sinks?
>>
>> Best,
>> Aljoscha
>>
>> On 8. Aug 2017, at 19:08, Claire Yuan <clairey...@yahoo-inc.com> wrote:
>>
>> Hi all,
>>   I am currently running some jobs coded in Beam in streaming mode on
>> Yarn session by Flink. My data sink was CSV files like the one in examples
>> of TfIdf. And I noticed that the output format for Beam is to produce one
>> file for every record, and also temp files for them. That would result in
>> my space used exceed maximum.
>>   I am not sure whether is the problem that I used the API incorrectly
>> but I am wondering if there any way I can put all those records into one
>> file, or keep updating in that file, or delete those tempt files by
>> windowing or triggering?
>>
>> Claire
>>
>>
>>
>>
>>
>>
>

Reply via email to