[
https://issues.apache.org/jira/browse/BEAM-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433335#comment-17433335
]
Beam JIRA Bot commented on BEAM-9528:
-------------------------------------
This issue was marked "stale-assigned" and has not received a public comment in
7 days. It is now automatically unassigned. If you are still working on it, you
can assign it to yourself again. Please also give an update about the status of
the work.
> Buggy/Slow FileIO.write()/sink implementation
> ---------------------------------------------
>
> Key: BEAM-9528
> URL: https://issues.apache.org/jira/browse/BEAM-9528
> Project: Beam
> Issue Type: Bug
> Components: io-java-files
> Affects Versions: 2.5.0, 2.19.0
> Reporter: Anand Singh Kunwar
> Priority: P3
>
> Context:
> I have been experimenting with generating columnar data from prometheus
> metric data to write to Google Cloud Storage. My pipeline takes input of
> Prometheus Remote Write HTTP payload from kafka(this is compressed in snappy
> and protobuf encoded), my first 2 steps of the pipeline do the uncompression
> and decoding and make a metric object. I window this input to fixed windows
> of 1 minute and write the window to GCS in ORC format. I have been seeing
> huge lag in my pipeline.
>
> Problem/Bug:
> The custom FileIO.write().sink implementation for ORCIO writes to GCS using
> the ORC library. In my sink implementation I even implemented all operations
> as no-ops, even then I saw a huge lag in my pipeline. When I comment out the
> FileIO transformation(that is actually a no-op), my pipeline keeps up with
> the input load.
> Looking up online my problem seems to relate to this
> [https://stackoverflow.com/questions/54094102/beam-pipeline-kafka-to-hdfs-by-time-buckets].
>
> I've tried running this on dataflow.
>
> This is what my code looks like:
> {code:java}
> p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers(
> "mykafka:9092")
> .withTopic(options.getInputTopic())
>
> .withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
> "custom-id",
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"))
> .withKeyDeserializer(LongDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
> .apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
> .apply("DecodeProto", ParDo.of(new DecodePromProto()))
> .apply("MapTSSample", ParDo.of(new MapTSSample()))
>
> .apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
> .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
> .apply(new WriteOneFilePerWindow(options.getOutput(), 1,
> ".orc"));{code}
>
> This is what WriteOneFilePerWindow.java's expand looks like for me:
> {code:java}
> public PDone expand(PCollection<TSSample> input) {
> input.apply(FileIO.<TSSample>write().to(filenamePrefix).withNaming(new
> MyFileNaming(filenameSuffix))
> .withNumShards(numShards).via(ORCIO.sink()));
> return PDone.in(input.getPipeline());
> }{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)