Samrat002 commented on code in PR #21458:
URL: https://github.com/apache/flink/pull/21458#discussion_r1284527153
##########
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:
##########
@@ -126,7 +126,16 @@ public long getPos() throws IOException {
@Override
public void sync() throws IOException {
- fileStream.sync();
Review Comment:
`sync` method is called on the following scenerios
1. `S3RecoverableWriter`
2. `FlinkS3FileSystem` creates new instance of `S3RecoverableWriter` when
`createRecoverableWriter()` method is called
3. `CsvBulkWriter` uses `FlinkS3FileSystem` and calls recoverableWriter.
4. `BulkWriter`
This change will not alter any processing guarantee.
In the current changes in `sync()` method , it takes the lock first then
makes a call to filesystem flush and commits remaining blocks (writes to s3).
This flow results in exactly once . Same code flow is implemented for
`AzureBlobFsRecoverableDataOutputStream` .
From the class `BlockBlobAppendStream`
```
public void hsync() throws IOException {
if (this.compactionEnabled) {
this.flush();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]