[ 
https://issues.apache.org/jira/browse/FLINK-25256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500468#comment-17500468
 ] 

Jiangjie Qin commented on FLINK-25256:
--------------------------------------

[~dwysakowicz] Pravega essentially uses an in-band checkpoint style. The 
checkpoint process is roughly the following:
 # Flink CheckpointCoordinator initiates the checkpoint, and invokes the 
MasterHooks.triggerHook(). The Pravega hook then tell the Pravega server that 
the Flink job has triggered a checkpoint.
 # The Pravega server inserts the checkpoint control messages in the data 
stream to each of the Prevaga readers of the Flink job.
 # When the Prevaga readers see the checkpoint control messages, they trigger 
the Flink task checkpoint via the {{ExternallyInducedSource.CheckpointTrigger}}

After FLIP-27, the SplitEnumerator can completely replace the MasterHook in JM. 
But Prevaga connector still relies on the 
{{ExternallyInducedSource.CheckpointTrigger}} to perform checkpoint in the 
subtasks.

Ultimately, the requirement is to manipulate the task based on some user space 
records. A similar requirement is stopping the subtask when it sees a given 
message in the stream. What we need to think of would be how much control plane 
actions do we want to expose to the users. So far the two actions we see are 
taking checkpoint on the tasks and stopping the tasks, and by now such 
manipulation requirements are only in the Source tasks.

We can probably just make such records driven task actions a more explicit 
primitive for the users. For example, we can have an interface like 
{{{}TaskActionTrigger{}}}, which is passed to each user logic. And that allows 
user logic to ask the task to take some action based on the records it 
processed. That said, I do think such control plane exposure should be minimal.

> Savepoints do not work with ExternallyInducedSources
> ----------------------------------------------------
>
>                 Key: FLINK-25256
>                 URL: https://issues.apache.org/jira/browse/FLINK-25256
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.0, 1.13.3
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>
> It is not possible to take a proper savepoint with 
> {{ExternallyInducedSource}} or {{ExternallyInducedSourceReader}} (both legacy 
> and FLIP-27 versions). The problem is that we're hardcoding 
> {{CheckpointOptions}} in the {{triggerHook}}.
> The outcome of current state is that operators would try to take checkpoints 
> in the checkpoint location whereas the {{CheckpointCoordinator}} will write 
> metadata for those states in the savepoint location.
> Moreover the situation gets even weirder (I have not checked it entirely), if 
> we have a mixture of {{ExternallyInducedSource(s)}} and regular sources. In 
> such a case the location and format at which the state of a particular task 
> is persisted depends on the order of barriers arrival. If a barrier from a 
> regular source arrives last the task takes a savepoint, on the other hand if 
> last barrier is from an externally induced source it will take a checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to