Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4798#discussion_r146165645
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -235,6 +235,10 @@ public RocksDBKeyedStateBackend(
this.instanceBasePath =
Preconditions.checkNotNull(instanceBasePath);
this.instanceRocksDBPath = new File(instanceBasePath, "db");
+ // Clear this directory when the backend is created
+ // in case something crashed and the backend never reached
dispose()
+ cleanInstanceBasePath();
+
if (!instanceBasePath.exists()) {
--- End diff --
You are right
---