Hi, I am currently working on a Beam pipeline (Flink runner) where we read JSON events from Kafka and write the output in parquet format to S3.
We write to S3 after every 10 min. We have observed that our pipeline sometimes stops writing to S3 after restarts (even for a non breaking minor code change), if we change kafka offset and restart pipeline it starts writing to S3 again. While s3 write fails, Pipeline runs fine without any issues and it processes records until FileIO stage. It gives no error/exceptions in logs but silently fails to write to S3 at FileIO stage. This is the stage where it is not sending any records out - FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards -> FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles) -> FileIO.Write/WriteFiles/GatherTempFileResults/Add void key/AddKeys/Map/ParMultiDo(Anonymous) We have checked our Windowing function by logging records after windowing, windowing works fine. This is our code snippet - parquetRecord.apply("Batch Events", Window.<GenericRecord>into( FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime)))) .triggering(AfterWatermark.pastEndOfWindow()) .withAllowedLateness(Duration.ZERO, Window.ClosingBehavior.FIRE_ALWAYS) .discardingFiredPanes()) .apply(Distinct.create()) .apply(FileIO.<GenericRecord>write() .via(ParquetIO.sink(getOutput_schema())) .to(outputPath.isEmpty() ? outputPath() : outputPath) .withNumShards(1) .withNaming(new CustomFileNaming("snappy.parquet"))); Any idea what could be wrong here or any open bugs in Beam? Regards, Tapan Upadhyay