Do you have checkpointing enabled in your Flink cluster? On Thu, Feb 18, 2021 at 11:50 AM Tapan Upadhyay <tap...@gmail.com> wrote:
> Hi, > > I am currently working on a Beam pipeline (Flink runner) where we read > JSON events from Kafka and write the output in parquet format to S3. > > We write to S3 after every 10 min. > > We have observed that our pipeline sometimes stops writing to S3 after > restarts (even for a non breaking minor code change), if we change kafka > offset and restart pipeline it starts writing to S3 again. > > While s3 write fails, Pipeline runs fine without any issues and it > processes records until FileIO stage. It gives no error/exceptions in logs > but silently fails to write to S3 at FileIO stage. > > This is the stage where it is not sending any records out - > FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards -> > FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles) > -> FileIO.Write/WriteFiles/GatherTempFileResults/Add void > key/AddKeys/Map/ParMultiDo(Anonymous) > > We have checked our Windowing function by logging records after windowing, > windowing works fine. > > This is our code snippet - > > parquetRecord.apply("Batch Events", Window.<GenericRecord>into( > > FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime)))) > .triggering(AfterWatermark.pastEndOfWindow()) > .withAllowedLateness(Duration.ZERO, > Window.ClosingBehavior.FIRE_ALWAYS) > .discardingFiredPanes()) > > .apply(Distinct.create()) > > .apply(FileIO.<GenericRecord>write() > .via(ParquetIO.sink(getOutput_schema())) > .to(outputPath.isEmpty() ? outputPath() : > outputPath) > .withNumShards(1) > .withNaming(new > CustomFileNaming("snappy.parquet"))); > > Any idea what could be wrong here or any open bugs in Beam? > > > Regards, > Tapan Upadhyay > >