[ 
https://issues.apache.org/jira/browse/FLINK-23495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392227#comment-17392227
 ] 

Brachi Packter commented on FLINK-23495:
----------------------------------------

Got it. Thanks for explanation.

What happens in case of errors in the downstream flow? Is the checkpoint still 
completed and ack the messages? 

I'm asking, because I can ack the messages, right after pulling them(as the 
bellow code snippet ), but it doesn't check for errors, question is if the 
checkpoint process aware of errors, or it just make checkpoints any certain 
time?

 

 
{noformat}
private void processMessage(
        SourceContext<OUT> sourceContext,
        List<ReceivedMessage> messages,
        PubSubCollector collector)
        throws Exception {
    rateLimiter.acquire(messages.size());

    synchronized (sourceContext.getCheckpointLock()) {
        for (ReceivedMessage message : messages) {
            acknowledgeOnCheckpoint.addAcknowledgeId(message.getAckId());

            PubsubMessage pubsubMessage = message.getMessage();

            deserializationSchema.deserialize(pubsubMessage, collector);
            if (collector.isEndOfStreamSignalled()) {
                cancel();
                return;
            }
        }
        if (withoutCheckpoint) {
            List<String> acknowledgeIds =
                    messages.stream()
                            .map(ReceivedMessage::getAckId)
                            .collect(Collectors.toList());
            subscriber.acknowledge(acknowledgeIds);
        }
    }
}{noformat}
 

 

> [GCP PubSub] Make checkpoint optional for preview/staging mode
> --------------------------------------------------------------
>
>                 Key: FLINK-23495
>                 URL: https://issues.apache.org/jira/browse/FLINK-23495
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Google Cloud PubSub
>    Affects Versions: 1.13.0, 1.13.1
>            Reporter: Brachi Packter
>            Priority: Major
>              Labels: pull-request-available
>
> I'm using PubSub connector with Flink sql.
> The issue that I get all the time error that PubSub required checkpoints, My 
> question is if I/you can submit a PR that adds a property that can configure 
> PubSub to start without checkpoints, and we can describe that it is just for 
> preview/staging mode (interactive sql, Jupiter..)
> Other connectors support starting without checkpoints.
> What will be the impact for this change? I tried it locally and it seems to 
> work ok. 
> That is the code that always fail the source if no checkpoint is configured, 
> i want to add some condition here:
> {code:java}
> if (hasNoCheckpointingEnabled(getRuntimeContext())) { 
> throw new IllegalArgumentException( "The PubSubSource REQUIRES Checkpointing 
> to be enabled and " + "the checkpointing frequency must be MUCH lower than 
> the PubSub timeout for it to retry a message."); 
> }
>  
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to