Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5811#discussion_r179743870
--- Diff:
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
---
@@ -366,6 +384,11 @@ public void
initializeState(FunctionInitializationContext context) throws Except
throw new RuntimeException("Error while creating
FileSystem when initializing the state of the BucketingSink.", e);
}
+ // sync on flush for local file systems
+ if (localSyncOnFlush && (fs instanceof LocalFileSystem) &&
(writerTemplate instanceof StreamWriterBase)) {
--- End diff --
Shouldn't `(writerTemplate instanceof StreamWriterBase)` check be converted
here into `checState(writerTemplate instanceof StreamWriterBase)` inside the if
branch, and the same check be extracted and validated whenever user calls:
`setWriter(...)` or `setLocalSyncOnFlush(...)`? So that non
`StreamWriterBase` and `localSyncOnFlush = true` would be invalid
configuration? Otherwise users might experience `wtf` moments when flag is
being ignored after changing their writer.
---