XComp commented on a change in pull request #17152:
URL: https://github.com/apache/flink/pull/17152#discussion_r702934683



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -64,19 +65,31 @@
                                 .<FlinkKafkaInternalProducer<?, 
?>>map(Recyclable::getObject)
                                 .orElseGet(() -> 
getRecoveryProducer(committable));
                 producer.commitTransaction();
-                recyclable.ifPresent(Recyclable::close);
-            } catch (ProducerFencedException | InvalidTxnStateException e) {
-                // That means we have committed this transaction before.
+            } catch (RetriableException e) {
                 LOG.warn(
-                        "Encountered error {} while recovering transaction {}. 
"
-                                + "Presumably this transaction has been 
already committed before",
-                        e,
-                        committable);
-                recyclable.ifPresent(Recyclable::close);
-            } catch (Throwable e) {
-                LOG.warn("Cannot commit Kafka transaction, retrying.", e);
+                        "Encountered retriable exception while committing 
{}.", transactionalId, e);
                 retryableCommittables.add(committable);
+                continue;
+            } catch (ProducerFencedException e) {
+                // initTransaction has been called on this transaction before
+                LOG.error(
+                        "Transactions {} timed out or was overridden and data 
has been potentially lost.",
+                        transactionalId,
+                        e);
+            } catch (InvalidTxnStateException e) {
+                // This exception only occurs when aborting after a commit or 
vice versa.
+                // It does not appear on double commits or double aborts.
+                LOG.error(
+                        "Transaction {} was previously canceled and data has 
been lost.",
+                        committable,
+                        e);
+            } catch (Exception e) {
+                LOG.error(
+                        "Transaction {} encountered error and data has been 
potentially lost.",
+                        committable,
+                        e);
             }
+            recyclable.ifPresent(Recyclable::close);

Review comment:
       True. IMHO, the code is hard to understand here because of the little 
`continue` being hidden in the catch blocks. But I cannot come up with a quick 
fix. ¯\_(ツ)_/¯




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to