NicoK commented on a change in pull request #16941: URL: https://github.com/apache/flink/pull/16941#discussion_r694935051
########## File path: docs/content/docs/connectors/datastream/kafka.md ########## @@ -848,4 +848,9 @@ clash with the ones that have already been used by other applications. In most cases, those applications are also Flink jobs running the same job graph, since the IDs are generated using the same logic, prefixed with `taskName + "-" + operatorUid`, by default. To solve the problem, you can use `setTransactionalIdPrefix()` to override this logic and specify different `transactional.id` prefixes for different jobs. +Other reason for this exception may be a transaction timeout on the broker side. After implementation of +[KAFKA-6119](https://issues.apache.org/jira/browse/KAFKA-6119), `(producerId, epoch)` is fenced +after transaction timeout and all of its pending transactions are aborted (each `transactional.id` is +mapped to a single `producerId`, this is described in more details in the following [blog post](https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/)). Review comment: ```suggestion Another reason for this exception may be a transaction timeout on the broker side. With the implementation of [KAFKA-6119](https://issues.apache.org/jira/browse/KAFKA-6119), the `(producerId, epoch)` will be fenced off after a transaction timeout and all of its pending transactions are aborted (each `transactional.id` is mapped to a single `producerId`; this is described in more detail in the following [blog post](https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/)). ``` ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java ########## @@ -61,13 +62,25 @@ private void commitTransaction(KafkaCommittable committable) { new FlinkKafkaInternalProducer<>(createKafkaProducerConfig(transactionalId))) { producer.resumeTransaction(committable.getProducerId(), committable.getEpoch()); producer.commitTransaction(); - } catch (InvalidTxnStateException | ProducerFencedException e) { - // That means we have committed this transaction before. + } catch (InvalidTxnStateException e) { LOG.warn( - "Encountered error {} while recovering transaction {}. " - + "Presumably this transaction has been already committed before", - e, - committable); + "Unable to commit recovered transaction ({}) because it's in invalid state. " + + "Most likely the transaction has been aborted for some reason. Please check Kafka logs for more details.", Review comment: ```suggestion "Unable to commit recovered transaction ({}) because it's in an invalid state. " + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", ``` ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java ########## @@ -1052,13 +1040,23 @@ protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transac producer = initTransactionalProducer(transaction.transactionalId, false); producer.resumeTransaction(transaction.producerId, transaction.epoch); producer.commitTransaction(); - } catch (InvalidTxnStateException | ProducerFencedException ex) { - // That means we have committed this transaction before. + } catch (InvalidTxnStateException e) { LOG.warn( - "Encountered error {} while recovering transaction {}. " - + "Presumably this transaction has been already committed before", - ex, - transaction); + "Unable to commit recovered transaction ({}) because it's in invalid state. " + + "Most likely the transaction has been aborted for some reason. Please check Kafka logs for more details.", + transaction, + e); + } catch (ProducerFencedException e) { + LOG.warn( + "Unable to commit recovered transaction ({}) because its producer is already fenced." + + " This means that you either have a different producer with the same '{}' or" + + " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," + + " please consult documentation for more details.", Review comment: ```suggestion + " please consult the Flink documentation for more details.", ``` ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java ########## @@ -61,13 +62,25 @@ private void commitTransaction(KafkaCommittable committable) { new FlinkKafkaInternalProducer<>(createKafkaProducerConfig(transactionalId))) { producer.resumeTransaction(committable.getProducerId(), committable.getEpoch()); producer.commitTransaction(); - } catch (InvalidTxnStateException | ProducerFencedException e) { - // That means we have committed this transaction before. + } catch (InvalidTxnStateException e) { LOG.warn( - "Encountered error {} while recovering transaction {}. " - + "Presumably this transaction has been already committed before", - e, - committable); + "Unable to commit recovered transaction ({}) because it's in invalid state. " + + "Most likely the transaction has been aborted for some reason. Please check Kafka logs for more details.", + committable, + e); + } catch (ProducerFencedException e) { + LOG.warn( + "Unable to commit recovered transaction ({}) because its producer is already fenced." + + " This means that you either have a different producer with the same '{}' (this is" + + " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)" + + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," + + " please consult documentation for more details.", + committable, + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + KafkaSink.class.getSimpleName(), Review comment: Why not just inline "KafkaSink" into the message string? ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java ########## @@ -1052,13 +1040,23 @@ protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transac producer = initTransactionalProducer(transaction.transactionalId, false); producer.resumeTransaction(transaction.producerId, transaction.epoch); producer.commitTransaction(); - } catch (InvalidTxnStateException | ProducerFencedException ex) { - // That means we have committed this transaction before. + } catch (InvalidTxnStateException e) { LOG.warn( - "Encountered error {} while recovering transaction {}. " - + "Presumably this transaction has been already committed before", - ex, - transaction); + "Unable to commit recovered transaction ({}) because it's in invalid state. " + + "Most likely the transaction has been aborted for some reason. Please check Kafka logs for more details.", Review comment: ```suggestion "Unable to commit recovered transaction ({}) because it's in an invalid state. " + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", ``` ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java ########## @@ -61,13 +62,25 @@ private void commitTransaction(KafkaCommittable committable) { new FlinkKafkaInternalProducer<>(createKafkaProducerConfig(transactionalId))) { producer.resumeTransaction(committable.getProducerId(), committable.getEpoch()); producer.commitTransaction(); - } catch (InvalidTxnStateException | ProducerFencedException e) { - // That means we have committed this transaction before. + } catch (InvalidTxnStateException e) { LOG.warn( - "Encountered error {} while recovering transaction {}. " - + "Presumably this transaction has been already committed before", - e, - committable); + "Unable to commit recovered transaction ({}) because it's in invalid state. " + + "Most likely the transaction has been aborted for some reason. Please check Kafka logs for more details.", + committable, + e); + } catch (ProducerFencedException e) { + LOG.warn( + "Unable to commit recovered transaction ({}) because its producer is already fenced." + + " This means that you either have a different producer with the same '{}' (this is" + + " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)" + + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," + + " please consult documentation for more details.", Review comment: I assume the Flink docs? (here, it's a bit ambiguous whether you mean the Kafka or Flink docs) ```suggestion + " please consult the Flink documentation for more details.", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
