[ 
https://issues.apache.org/jira/browse/FLINK-27963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17552401#comment-17552401
 ] 

Dmytro edited comment on FLINK-27963 at 6/9/22 8:26 PM:
--------------------------------------------------------

[~martijnvisser], I have an idea how any exception which comes from Kafka side 
could be managed properly by developers who use Flink Kafka Connectors in their 
projects.

*The idea:*  
To allow to overwrite the 
[KafkaWriter::deliveryCallback|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L90]
 field (which is private final field in the 1.15.0 and 1.14.4 versions as well 
as in the future 1.16 version) by a custom delivery callback. The custom 
callback may suppress an 
[ApiException|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java],
 if it is required by a project architecture. Also, if the deliveryCallback is 
set to null then it is not going to be used in the 
[KafkaProducer::doSend|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L966]
 method at all. In this "deliveryCallback =null" case, all exceptions could be 
managed in a custom [producer 
interceptor|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java]
 because the "[this.interceptors.onSendError(record, tp, 
e);|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1062]";
 is called in all catches in the 
[KafkaProducer::doSend|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L966]

>From my prospective of view,  "to manage exceptions" means to skip failed 
>messages without the a Restarting-Failed-Restarting loop, just log errors and 
>do not stop the whole data processing.

*Possible solution:*

The new "Callback deliveryCallback" input parameter in the [KafkaWriter 
constructor|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L132]
{code:java}
KafkaWriter(
        DeliveryGuarantee deliveryGuarantee,
        Properties kafkaProducerConfig,
        String transactionalIdPrefix,
        Sink.InitContext sinkInitContext,
        KafkaRecordSerializationSchema<IN> recordSerializer,
        SerializationSchema.InitializationContext schemaContext,
        Collection<KafkaWriterState> recoveredStates,
        Callback deliveryCallback) {
    this.deliveryGuarantee = checkNotNull(deliveryGuarantee, 
"deliveryGuarantee");
    this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, 
"kafkaProducerConfig");
    this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, 
"transactionalIdPrefix");
    this.recordSerializer = checkNotNull(recordSerializer, "recordSerializer");
    this.deliveryCallback = deliveryCallback==null ? new WriterCallback(
                    sinkInitContext.getMailboxExecutor(),
                    
sinkInitContext.<RecordMetadata>metadataConsumer().orElse(null)) : 
deliveryCallback;
 {code}




 

*Some details:*

To produce/sink a message there is the following call stack 
(flink-connector-kafka v1.15.0)  

-> 
[KafkaSink|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java]

-> 
[KafkaWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java]

     creates a delivery callback (private final Callback deliveryCallback) and 
passes it to the 
[doSend|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L966]
 method in the 
[KafkaProducer|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java]
 below 

-> 
[KafkaProducer|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java]
     the 
[doSend|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L966]
 method calls [KafkaWriter::deliveryCallback::onCompletion(null, 
exception)|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1056]
  if the 
[ApiException|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java]
 occurs 

 
{code:java}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback 
callback) {
// .......

} catch (ApiException e) {
    log.debug("Exception occurred during message send:", e);
    if (callback != null)
        callback.onCompletion(null, e);
    this.errors.record();
    this.interceptors.onSendError(record, tp, e);
    return new FutureFailure(e);
} catch (InterruptedException e) {
    this.errors.record();
    this.interceptors.onSendError(record, tp, e);
    throw new InterruptException(e);
} catch (KafkaException e) {
    this.errors.record();
    this.interceptors.onSendError(record, tp, e);
    throw e;
} catch (Exception e) {
    // we notify interceptor about all exceptions, since onSend is called 
before anything else in this method
    this.interceptors.onSendError(record, tp, e);
    throw e;
}{code}
-> 
[KafkaWriter::onCompletion|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L413]
 throws an exception if the exception input parameter is not null
{code:java}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception != null) {
        FlinkKafkaInternalProducer<byte[], byte[]> producer =
                KafkaWriter.this.currentProducer;
        mailboxExecutor.execute(
                () -> {
                    numRecordsOutErrorsCounter.inc();
                    numRecordsSendErrorsCounter.inc();
                    throwException(metadata, exception, producer);
                },
                "Failed to send data to Kafka");
    }

    if (metadataConsumer != null) {
        metadataConsumer.accept(metadata);
    }
} {code}
 

 


was (Author: JIRAUSER290675):
[~martijnvisser], I have an idea how any exception which comes from Kafka side 
could be managed properly by developers who use Flink Kafka Connectors in their 
projects.

*The idea:*  
To allow to overwrite the 
[KafkaWriter::deliveryCallback|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L90]
 field (which is private final field in the 1.15.0 and 1.14.4 versions as well 
as in the future 1.16 version) by a custom delivery callback. The custom 
callback may suppress an 
[ApiException|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java],
 if it is required by a project architecture. Also, if the deliveryCallback is 
set to null then it is not going to be used in the 
[KafkaProducer::doSend|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L966]
 method at all. In this "deliveryCallback =null" case, all exceptions could be 
managed in a custom [producer 
interceptor|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java]
 because the "[this.interceptors.onSendError(record, tp, 
e);|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1062]";
 is called in all catches in the 
[KafkaProducer::doSend|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L966]


>From my prospective of view,  "to manage exceptions" means to skip failed 
>messages without the a Restarting-Failed-Restarting loop, just log errors and 
>do not stop the whole data processing.



*Some details:*

To produce/sink a message there is the following call stack 
(flink-connector-kafka v1.15.0)  

-> 
[KafkaSink|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java]

-> 
[KafkaWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java]

     creates a delivery callback (private final Callback deliveryCallback) and 
passes it to the 
[doSend|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L966]
 method in the 
[KafkaProducer|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java]
 below 

-> 
[KafkaProducer|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java]
     the 
[doSend|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L966]
 method calls [KafkaWriter::deliveryCallback::onCompletion(null, 
exception)|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1056]
  if the 
[ApiException|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java]
 occurs 

 
{code:java}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback 
callback) {
// .......

} catch (ApiException e) {
    log.debug("Exception occurred during message send:", e);
    if (callback != null)
        callback.onCompletion(null, e);
    this.errors.record();
    this.interceptors.onSendError(record, tp, e);
    return new FutureFailure(e);
} catch (InterruptedException e) {
    this.errors.record();
    this.interceptors.onSendError(record, tp, e);
    throw new InterruptException(e);
} catch (KafkaException e) {
    this.errors.record();
    this.interceptors.onSendError(record, tp, e);
    throw e;
} catch (Exception e) {
    // we notify interceptor about all exceptions, since onSend is called 
before anything else in this method
    this.interceptors.onSendError(record, tp, e);
    throw e;
}{code}
-> 
[KafkaWriter::onCompletion|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L413]
 throws an exception if the exception input parameter is not null
{code:java}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception != null) {
        FlinkKafkaInternalProducer<byte[], byte[]> producer =
                KafkaWriter.this.currentProducer;
        mailboxExecutor.execute(
                () -> {
                    numRecordsOutErrorsCounter.inc();
                    numRecordsSendErrorsCounter.inc();
                    throwException(metadata, exception, producer);
                },
                "Failed to send data to Kafka");
    }

    if (metadataConsumer != null) {
        metadataConsumer.accept(metadata);
    }
} {code}
 

 

> FlinkRuntimeException in KafkaSink causes a Flink job to hang
> -------------------------------------------------------------
>
>                 Key: FLINK-27963
>                 URL: https://issues.apache.org/jira/browse/FLINK-27963
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.0, 1.14.4
>            Reporter: Dmytro
>            Priority: Major
>              Labels: FlinkRuntimeException, KafkaSink
>
> If FlinkRuntimeException occurs in the 
> [KafkaSink|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#kafka-sink]
>  then the Flink job tries to re-send failed data  again and gets into endless 
> loop "exception->send again"
> *Code sample which throws the FlinkRuntimeException:*
> {code:java}
> int numberOfRows = 1;
>     int rowsPerSecond = 1;
>     DataStream<String> stream = environment.addSource(
>                     new DataGeneratorSource<>(
>                             RandomGenerator.stringGenerator(1050000), // 
> max.message.bytes=1048588
>                             rowsPerSecond,
>                             (long) numberOfRows),
>                     TypeInformation.of(String.class))
>             .setParallelism(1)
>             .name("string-generator");
>     KafkaSinkBuilder<String> builder = KafkaSink.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>             .setRecordSerializer(
>                     
> KafkaRecordSerializationSchema.builder().setTopic("test.output")
>                             .setValueSerializationSchema(new 
> SimpleStringSchema())
>                             .build());
>     KafkaSink<String> sink = builder.build();
>     stream.sinkTo(sink).setParallelism(1).name("output-producer"); {code}
> *Exception Stack Trace:*
> {code:java}
> 2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO 
> output-producer: Writer -> output-producer: Committer (1/1) 
> (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on 
> 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1). 
> org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka 
> null with FlinkKafkaInternalProducer{transactionalId='null', 
> inTransaction=false, closed=false} at 
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440)
>  ~[flink-connector-kafka-1.15.0.jar:1.15.0] at 
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421)
>  ~[flink-connector-kafka-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
> ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748) 
> ~[?:1.8.0_292] Caused by: 
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1050088 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration. {code}
> **



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to