AHeise commented on a change in pull request #17152:
URL: https://github.com/apache/flink/pull/17152#discussion_r702845196
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -64,19 +65,31 @@
.<FlinkKafkaInternalProducer<?,
?>>map(Recyclable::getObject)
.orElseGet(() ->
getRecoveryProducer(committable));
producer.commitTransaction();
- recyclable.ifPresent(Recyclable::close);
- } catch (ProducerFencedException | InvalidTxnStateException e) {
- // That means we have committed this transaction before.
+ } catch (RetriableException e) {
LOG.warn(
- "Encountered error {} while recovering transaction {}.
"
- + "Presumably this transaction has been
already committed before",
- e,
- committable);
- recyclable.ifPresent(Recyclable::close);
- } catch (Throwable e) {
- LOG.warn("Cannot commit Kafka transaction, retrying.", e);
+ "Encountered retriable exception while committing
{}.", transactionalId, e);
retryableCommittables.add(committable);
+ continue;
+ } catch (ProducerFencedException e) {
+ // initTransaction has been called on this transaction before
+ LOG.error(
+ "Transactions {} timed out or was overridden and data
has been potentially lost.",
+ transactionalId,
+ e);
+ } catch (InvalidTxnStateException e) {
+ // This exception only occurs when aborting after a commit or
vice versa.
+ // It does not appear on double commits or double aborts.
+ LOG.error(
+ "Transaction {} was previously canceled and data has
been lost.",
+ committable,
+ e);
+ } catch (Exception e) {
+ LOG.error(
+ "Transaction {} encountered error and data has been
potentially lost.",
+ committable,
+ e);
}
+ recyclable.ifPresent(Recyclable::close);
Review comment:
> Why can we close the producer if we might want to retry committing the
Committables? 🤔
We are not retrying them at this point. Retriable path is short-cutted with
`continue`.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -64,19 +65,31 @@
.<FlinkKafkaInternalProducer<?,
?>>map(Recyclable::getObject)
.orElseGet(() ->
getRecoveryProducer(committable));
producer.commitTransaction();
- recyclable.ifPresent(Recyclable::close);
- } catch (ProducerFencedException | InvalidTxnStateException e) {
- // That means we have committed this transaction before.
+ } catch (RetriableException e) {
LOG.warn(
- "Encountered error {} while recovering transaction {}.
"
- + "Presumably this transaction has been
already committed before",
- e,
- committable);
- recyclable.ifPresent(Recyclable::close);
- } catch (Throwable e) {
- LOG.warn("Cannot commit Kafka transaction, retrying.", e);
+ "Encountered retriable exception while committing
{}.", transactionalId, e);
retryableCommittables.add(committable);
+ continue;
+ } catch (ProducerFencedException e) {
+ // initTransaction has been called on this transaction before
+ LOG.error(
+ "Transactions {} timed out or was overridden and data
has been potentially lost.",
+ transactionalId,
+ e);
+ } catch (InvalidTxnStateException e) {
+ // This exception only occurs when aborting after a commit or
vice versa.
+ // It does not appear on double commits or double aborts.
+ LOG.error(
+ "Transaction {} was previously canceled and data has
been lost.",
+ committable,
+ e);
+ } catch (Exception e) {
+ LOG.error(
+ "Transaction {} encountered error and data has been
potentially lost.",
+ committable,
+ e);
}
+ recyclable.ifPresent(Recyclable::close);
Review comment:
> Why can we close the producer if we might want to retry committing the
Committables? 🤔
We are not retrying them at this point. Retriable path is short-cutted with
`continue`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]