Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r165343021
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
---
@@ -156,30 +157,38 @@ public void acknowledgeCheckpoint(
}
};
- TaskLocalStateStore taskLocalStateStore = new
TaskLocalStateStore(jobID, jobVertexID, subtaskIdx) {
- @Override
- public void storeLocalState(
- @Nonnull CheckpointMetaData checkpointMetaData,
- @Nullable TaskStateSnapshot localState) {
-
- Assert.assertEquals(tm, localState);
- tmReported.set(true);
- }
- };
+ TemporaryFolder temporaryFolder = new TemporaryFolder();
- TaskStateManagerImpl taskStateManager =
- new TaskStateManagerImpl(
- jobID,
- executionAttemptID,
- taskLocalStateStore,
- null,
- checkpointResponder);
-
- taskStateManager.reportTaskStateSnapshots(
- checkpointMetaData,
- checkpointMetrics,
- jm,
- tm);
+ try {
+ TaskLocalStateStore taskLocalStateStore =
+ new TaskLocalStateStore(jobID, jobVertexID,
subtaskIdx, temporaryFolder.newFolder()) {
+ @Override
+ public void storeLocalState(
+ @Nonnull CheckpointMetaData
checkpointMetaData,
+ @Nullable TaskStateSnapshot
localState) {
+
+ Assert.assertEquals(tm,
localState);
+ tmReported.set(true);
+ }
+ };
+
+ TaskStateManagerImpl taskStateManager =
+ new TaskStateManagerImpl(
+ jobID,
+ executionAttemptID,
+ taskLocalStateStore,
+ null,
+ checkpointResponder);
+
+ taskStateManager.reportTaskStateSnapshots(
+ checkpointMetaData,
+ checkpointMetrics,
+ jm,
+ tm);
+ } catch (Exception ex) {
+ temporaryFolder.delete();
+ throw new RuntimeException(ex);
--- End diff --
Why do we throw a `RuntimeException`?
---