[ 
https://issues.apache.org/jira/browse/BEAM-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433335#comment-17433335
 ] 

Beam JIRA Bot commented on BEAM-9528:
-------------------------------------

This issue was marked "stale-assigned" and has not received a public comment in 
7 days. It is now automatically unassigned. If you are still working on it, you 
can assign it to yourself again. Please also give an update about the status of 
the work.

> Buggy/Slow FileIO.write()/sink implementation
> ---------------------------------------------
>
>                 Key: BEAM-9528
>                 URL: https://issues.apache.org/jira/browse/BEAM-9528
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-files
>    Affects Versions: 2.5.0, 2.19.0
>            Reporter: Anand Singh Kunwar
>            Priority: P3
>
> Context:
> I have been experimenting with generating columnar data from prometheus 
> metric data to write to Google Cloud Storage. My pipeline takes input of 
> Prometheus Remote Write HTTP payload from kafka(this is compressed in snappy 
> and protobuf encoded), my first 2 steps of the pipeline do the uncompression 
> and decoding and make a metric object. I window this input to fixed windows 
> of 1 minute and write the window to GCS in ORC format. I have been seeing 
> huge lag in my pipeline.
>  
> Problem/Bug:
> The custom FileIO.write().sink implementation for ORCIO writes to GCS using 
> the ORC library. In my sink implementation I even implemented all operations 
> as no-ops, even then I saw a huge lag in my pipeline. When I comment out the 
> FileIO transformation(that is actually a no-op), my pipeline keeps up with 
> the input load.
> Looking up online my problem seems to relate to this 
> [https://stackoverflow.com/questions/54094102/beam-pipeline-kafka-to-hdfs-by-time-buckets].
>  
> I've tried running this on dataflow.
>  
> This is what my code looks like:
> {code:java}
> p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers(
>         "mykafka:9092")
>         .withTopic(options.getInputTopic())
>         
> .withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "custom-id",
>                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"))
>         .withKeyDeserializer(LongDeserializer.class)
>         .withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
>         .apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
>         .apply("DecodeProto", ParDo.of(new DecodePromProto()))
>         .apply("MapTSSample", ParDo.of(new MapTSSample()))
>         
> .apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
>                 .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
>         .apply(new WriteOneFilePerWindow(options.getOutput(), 1, 
> ".orc"));{code}
>  
> This is what WriteOneFilePerWindow.java's expand looks like for me:
> {code:java}
> public PDone expand(PCollection<TSSample> input) {
>     input.apply(FileIO.<TSSample>write().to(filenamePrefix).withNaming(new 
> MyFileNaming(filenameSuffix))
>         .withNumShards(numShards).via(ORCIO.sink()));
>     return PDone.in(input.getPipeline());
> }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to