Btw, I created an issue for this: 
https://issues.apache.org/jira/browse/BEAM-2873 
<https://issues.apache.org/jira/browse/BEAM-2873>

> On 10. Aug 2017, at 17:40, Reuven Lax <[email protected]> wrote:
> 
> 
> 
> On Thu, Aug 10, 2017 at 8:29 AM, Reuven Lax <[email protected] 
> <mailto:[email protected]>> wrote:
> This is how the file sink has always worked in Beam. If no sharding is 
> specified, then this means runner-determined sharding, and by default that is 
> one file per bundle. If Flink has small bundles, then I suggest using the 
> withNumShards method to explicitly pick the number of output shards.
> 
> The Flink runner can detect that runner-determined sharding has been chosen, 
> and override it with a specific number of shards. For example, the Dataflow 
> streaming runner (which as you mentioned also has small bundles) detects this 
> case and sets the number of out files shards based on the number of workers 
> in the worker pool Here 
> <https://github.com/apache/beam/blob/9e6530adb00669b7cf0f01cb8b128be0a21fd721/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L354>
>  is the code that does this; it should be quite simple to do something 
> similar for Flink, and then there will be no need for users to explicitly 
> call withNumShards themselves.
> 
> On Thu, Aug 10, 2017 at 3:09 AM, Aljoscha Krettek <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi,
> 
> I think Beam File Writing was recently changed to write one file per bundle. 
> Currently, with the Flink Streaming Runner every element is considered to be 
> one bundle, i.e. all bundles have size 1. This means that we write one file 
> per output element.
> 
> @Reuven, could you please confirm? Also, won't this be a problem for other 
> Runners that can have very small bundle sizes? IIRC the Dataflow Runner also 
> has rather small bundles in streaming mode, I'm not sure, though.
> 
> Best,
> Aljoscha
> 
>> On 9. Aug 2017, at 19:14, Claire Yuan <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Hi Aljoscha,
>>   I used the same sink as in the example TfIdf, and set the streamingOptions 
>> to be True, then I got one record per files. Here is the function to write 
>> my output. I called it in my pipeline at main method.
>>   public static class WriteOut extends PTransform<PCollection<KV<String, 
>> String>>, PDone> {
>>     private String output;
>>     public WriteOut(String output) {
>>       this.output = output;
>>     }
>>     @Override
>>     public PDone expand(PCollection<KV<String, String>> someInfos) {
>>       return outputInfos
>>           .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
>>             @ProcessElement
>>             public void processElement(ProcessContext c) {
>>               c.output(String.format("%s %s:\t%s",
>>                   c.element().getKey().getName(),
>>                   c.element().getKey().getNum(),
>>                   c.element().getValue()
>>                   ));
>>             }
>>           }))
>>           .apply(TextIO.write()
>>               .to(output) /* name of the output files */
>>               .withSuffix(".csv"));
>>     }
>>   }
>> 
>> Claire
>> 
>> 
>> On Wednesday, August 9, 2017 2:21 AM, Aljoscha Krettek <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> 
>> Hi,
>> 
>> I think Flink should not create one output file per record. Could you maybe 
>> post a snipped or minimal code example that shows how you're setting up your 
>> sinks?
>> 
>> Best,
>> Aljoscha
>>> On 8. Aug 2017, at 19:08, Claire Yuan <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> Hi all,
>>>   I am currently running some jobs coded in Beam in streaming mode on Yarn 
>>> session by Flink. My data sink was CSV files like the one in examples of 
>>> TfIdf. And I noticed that the output format for Beam is to produce one file 
>>> for every record, and also temp files for them. That would result in my 
>>> space used exceed maximum. 
>>>   I am not sure whether is the problem that I used the API incorrectly but 
>>> I am wondering if there any way I can put all those records into one file, 
>>> or keep updating in that file, or delete those tempt files by windowing or 
>>> triggering?
>>> 
>>> Claire
>> 
>> 
>> 
> 
> 
> 

Reply via email to