[
https://issues.apache.org/jira/browse/FLINK-25256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500468#comment-17500468
]
Jiangjie Qin commented on FLINK-25256:
--------------------------------------
[~dwysakowicz] Pravega essentially uses an in-band checkpoint style. The
checkpoint process is roughly the following:
# Flink CheckpointCoordinator initiates the checkpoint, and invokes the
MasterHooks.triggerHook(). The Pravega hook then tell the Pravega server that
the Flink job has triggered a checkpoint.
# The Pravega server inserts the checkpoint control messages in the data
stream to each of the Prevaga readers of the Flink job.
# When the Prevaga readers see the checkpoint control messages, they trigger
the Flink task checkpoint via the {{ExternallyInducedSource.CheckpointTrigger}}
After FLIP-27, the SplitEnumerator can completely replace the MasterHook in JM.
But Prevaga connector still relies on the
{{ExternallyInducedSource.CheckpointTrigger}} to perform checkpoint in the
subtasks.
Ultimately, the requirement is to manipulate the task based on some user space
records. A similar requirement is stopping the subtask when it sees a given
message in the stream. What we need to think of would be how much control plane
actions do we want to expose to the users. So far the two actions we see are
taking checkpoint on the tasks and stopping the tasks, and by now such
manipulation requirements are only in the Source tasks.
We can probably just make such records driven task actions a more explicit
primitive for the users. For example, we can have an interface like
{{{}TaskActionTrigger{}}}, which is passed to each user logic. And that allows
user logic to ask the task to take some action based on the records it
processed. That said, I do think such control plane exposure should be minimal.
> Savepoints do not work with ExternallyInducedSources
> ----------------------------------------------------
>
> Key: FLINK-25256
> URL: https://issues.apache.org/jira/browse/FLINK-25256
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.0, 1.13.3
> Reporter: Dawid Wysakowicz
> Priority: Major
>
> It is not possible to take a proper savepoint with
> {{ExternallyInducedSource}} or {{ExternallyInducedSourceReader}} (both legacy
> and FLIP-27 versions). The problem is that we're hardcoding
> {{CheckpointOptions}} in the {{triggerHook}}.
> The outcome of current state is that operators would try to take checkpoints
> in the checkpoint location whereas the {{CheckpointCoordinator}} will write
> metadata for those states in the savepoint location.
> Moreover the situation gets even weirder (I have not checked it entirely), if
> we have a mixture of {{ExternallyInducedSource(s)}} and regular sources. In
> such a case the location and format at which the state of a particular task
> is persisted depends on the order of barriers arrival. If a barrier from a
> regular source arrives last the task takes a savepoint, on the other hand if
> last barrier is from an externally induced source it will take a checkpoint.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)