Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r168470916 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -553,27 +616,36 @@ public boolean supportsAsynchronousSnapshots() { metaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator)); + LocalRecoveryDirectoryProvider directoryProvider = + FsStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryConfig.getLocalRecoveryMode()) ? + localRecoveryConfig.getLocalStateDirectories() : + null; + + final ThrowingSupplier<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier = + () -> new CheckpointStreamWithResultProvider.Factory().create( + checkpointId, + CheckpointedStateScope.EXCLUSIVE, --- End diff -- There is a compilation error. The second argument is expected to be a `long`.
---