AHeise commented on a change in pull request #17152:
URL: https://github.com/apache/flink/pull/17152#discussion_r702814948
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -64,19 +65,31 @@
.<FlinkKafkaInternalProducer<?,
?>>map(Recyclable::getObject)
.orElseGet(() ->
getRecoveryProducer(committable));
producer.commitTransaction();
- recyclable.ifPresent(Recyclable::close);
- } catch (ProducerFencedException | InvalidTxnStateException e) {
- // That means we have committed this transaction before.
+ } catch (RetriableException e) {
LOG.warn(
- "Encountered error {} while recovering transaction {}.
"
- + "Presumably this transaction has been
already committed before",
- e,
- committable);
- recyclable.ifPresent(Recyclable::close);
- } catch (Throwable e) {
- LOG.warn("Cannot commit Kafka transaction, retrying.", e);
+ "Encountered retriable exception while committing
{}.", transactionalId, e);
retryableCommittables.add(committable);
+ continue;
+ } catch (ProducerFencedException e) {
+ // initTransaction has been called on this transaction before
+ LOG.error(
+ "Transactions {} timed out or was overridden and data
has been potentially lost.",
+ transactionalId,
+ e);
+ } catch (InvalidTxnStateException e) {
+ // This exception only occurs when aborting after a commit or
vice versa.
+ // It does not appear on double commits or double aborts.
+ LOG.error(
+ "Transaction {} was previously canceled and data has
been lost.",
+ committable,
+ e);
+ } catch (Exception e) {
Review comment:
This was an unintended change but now I intend to keep it. I don't think
we can and should handle Errors here (think of OOM).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]