Anand Singh Kunwar created BEAM-9528:
----------------------------------------

             Summary: Buggy/Slow FileIO.write()/sink implementation
                 Key: BEAM-9528
                 URL: https://issues.apache.org/jira/browse/BEAM-9528
             Project: Beam
          Issue Type: Bug
          Components: io-java-files
    Affects Versions: 2.19.0, 2.5.0
            Reporter: Anand Singh Kunwar


Context:
I have been experimenting with generating columnar data from prometheus metric 
data to write to Google Cloud Storage. My pipeline takes input of Prometheus 
Remote Write HTTP payload from kafka(this is compressed in snappy and protobuf 
encoded), my first 2 steps of the pipeline do the uncompression and decoding 
and make a metric object. I window this input to fixed windows of 1 minute and 
write the window to GCS in ORC format. I have been seeing huge lag in my 
pipeline.

 
Problem/Bug:
The custom FileIO.write().sink implementation for ORCIO writes to GCS using the 
ORC library. In my sink implementation I even implemented all operations as 
no-ops, even then I saw a huge lag in my pipeline. When I comment out the 
FileIO transformation(that is actually a no-op), my pipeline keeps up with the 
input load.
Looking up online my problem seems to relate to this 
[https://stackoverflow.com/questions/54094102/beam-pipeline-kafka-to-hdfs-by-time-buckets].
 
I've tried running this on dataflow.
 
This is what my code looks like:
{code:java}
p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers(
        "mykafka:9092")
        .withTopic(options.getInputTopic())
        
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"custom-id",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"))
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
        .apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
        .apply("DecodeProto", ParDo.of(new DecodePromProto()))
        .apply("MapTSSample", ParDo.of(new MapTSSample()))
        
.apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
                .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
        .apply(new WriteOneFilePerWindow(options.getOutput(), 1, ".orc"));{code}
 
This is what WriteOneFilePerWindow.java's expand looks like for me:

{code:java}
public PDone expand(PCollection<TSSample> input) {
    input.apply(FileIO.<TSSample>write().to(filenamePrefix).withNaming(new 
MyFileNaming(filenameSuffix))
        .withNumShards(numShards).via(ORCIO.sink()));
    return PDone.in(input.getPipeline());
}{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to