Hi,

I am currently working on a Beam pipeline (Flink runner) where we read JSON
events from Kafka and write the output in parquet format to S3.

We write to S3 after every 10 min.

We have observed that our pipeline sometimes stops writing to S3 after
restarts (even for a non breaking minor code change), if we change kafka
offset and restart pipeline it starts writing to S3 again.

While s3 write fails, Pipeline runs fine without any issues and it
processes records until FileIO stage. It gives no error/exceptions in logs
but silently fails to write to S3 at FileIO stage.

This is the stage where it is not sending any records out -
FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ->
FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
-> FileIO.Write/WriteFiles/GatherTempFileResults/Add void
key/AddKeys/Map/ParMultiDo(Anonymous)

We have checked our Windowing function by logging records after windowing,
windowing works fine.

This is our code snippet -

parquetRecord.apply("Batch Events", Window.<GenericRecord>into(

FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime))))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .withAllowedLateness(Duration.ZERO,
Window.ClosingBehavior.FIRE_ALWAYS)
                    .discardingFiredPanes())

                    .apply(Distinct.create())

                    .apply(FileIO.<GenericRecord>write()
                            .via(ParquetIO.sink(getOutput_schema()))
                            .to(outputPath.isEmpty() ? outputPath() :
outputPath)
                            .withNumShards(1)
                            .withNaming(new
CustomFileNaming("snappy.parquet")));

Any idea what could be wrong here or any open bugs in Beam?


Regards,
Tapan Upadhyay

Reply via email to