Do you have checkpointing enabled in your Flink cluster?

On Thu, Feb 18, 2021 at 11:50 AM Tapan Upadhyay <tap...@gmail.com> wrote:

> Hi,
>
> I am currently working on a Beam pipeline (Flink runner) where we read
> JSON events from Kafka and write the output in parquet format to S3.
>
> We write to S3 after every 10 min.
>
> We have observed that our pipeline sometimes stops writing to S3 after
> restarts (even for a non breaking minor code change), if we change kafka
> offset and restart pipeline it starts writing to S3 again.
>
> While s3 write fails, Pipeline runs fine without any issues and it
> processes records until FileIO stage. It gives no error/exceptions in logs
> but silently fails to write to S3 at FileIO stage.
>
> This is the stage where it is not sending any records out -
> FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ->
> FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
> -> FileIO.Write/WriteFiles/GatherTempFileResults/Add void
> key/AddKeys/Map/ParMultiDo(Anonymous)
>
> We have checked our Windowing function by logging records after windowing,
> windowing works fine.
>
> This is our code snippet -
>
> parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
>
> FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime))))
>                     .triggering(AfterWatermark.pastEndOfWindow())
>                     .withAllowedLateness(Duration.ZERO,
> Window.ClosingBehavior.FIRE_ALWAYS)
>                     .discardingFiredPanes())
>
>                     .apply(Distinct.create())
>
>                     .apply(FileIO.<GenericRecord>write()
>                             .via(ParquetIO.sink(getOutput_schema()))
>                             .to(outputPath.isEmpty() ? outputPath() :
> outputPath)
>                             .withNumShards(1)
>                             .withNaming(new
> CustomFileNaming("snappy.parquet")));
>
> Any idea what could be wrong here or any open bugs in Beam?
>
>
> Regards,
> Tapan Upadhyay
>
>

Reply via email to