[
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16126152#comment-16126152
]
ASF GitHub Bot commented on FLINK-7213:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/4353#discussion_r133022095
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
---
@@ -553,31 +551,29 @@ public void testTriggerAndConfirmSimpleCheckpoint() {
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
- OperatorID opID1 =
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
- OperatorID opID2 =
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
-
- Map<OperatorID, OperatorState> operatorStates =
checkpoint.getOperatorStates();
-
- operatorStates.put(opID1, new SpyInjectingOperatorState(
- opID1,
vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism()));
- operatorStates.put(opID2, new SpyInjectingOperatorState(
- opID2,
vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism()));
-
// check that the vertices received the trigger
checkpoint message
{
verify(vertex1.getCurrentExecutionAttempt(),
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp),
any(CheckpointOptions.class));
verify(vertex2.getCurrentExecutionAttempt(),
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp),
any(CheckpointOptions.class));
}
+ OperatorID opID1 =
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+ OperatorID opID2 =
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+ TaskStateSnapshot taskOperatorSubtaskStates1 =
mock(TaskStateSnapshot.class);
--- End diff --
Why not create a proper `TaskStateSnapshot` with one entry, rather than
mocking?
> Introduce state management by OperatorID in TaskManager
> -------------------------------------------------------
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.4.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of
> managing state on the operator level instead of the task level by introducing
> explicit operator_id -> state mappings. However, this explicit mapping was
> not introduced in the task manager side, so the explicit mapping is still
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the
> task manager.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)