NicoK commented on a change in pull request #16941:
URL: https://github.com/apache/flink/pull/16941#discussion_r694935051



##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -848,4 +848,9 @@ clash with the ones that have already been used by other 
applications.
 In most cases, those applications are also Flink jobs running the same job 
graph, since the IDs are generated using the same logic, prefixed with 
`taskName + "-" + operatorUid`, by default.
 To solve the problem, you can use `setTransactionalIdPrefix()` to override 
this logic and specify different `transactional.id` prefixes for different jobs.
 
+Other reason for this exception may be a transaction timeout on the broker 
side. After implementation of
+[KAFKA-6119](https://issues.apache.org/jira/browse/KAFKA-6119), `(producerId, 
epoch)` is fenced
+after transaction timeout and all of its pending transactions are aborted 
(each `transactional.id` is
+mapped to a single `producerId`, this is described in more details in the 
following [blog 
post](https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/)).

Review comment:
       ```suggestion
   Another reason for this exception may be a transaction timeout on the broker 
side. With the implementation of
   [KAFKA-6119](https://issues.apache.org/jira/browse/KAFKA-6119), the 
`(producerId, epoch)` will be fenced off
   after a transaction timeout and all of its pending transactions are aborted 
(each `transactional.id` is
   mapped to a single `producerId`; this is described in more detail in the 
following [blog 
post](https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/)).
   ```

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -61,13 +62,25 @@ private void commitTransaction(KafkaCommittable 
committable) {
                 new 
FlinkKafkaInternalProducer<>(createKafkaProducerConfig(transactionalId))) {
             producer.resumeTransaction(committable.getProducerId(), 
committable.getEpoch());
             producer.commitTransaction();
-        } catch (InvalidTxnStateException | ProducerFencedException e) {
-            // That means we have committed this transaction before.
+        } catch (InvalidTxnStateException e) {
             LOG.warn(
-                    "Encountered error {} while recovering transaction {}. "
-                            + "Presumably this transaction has been already 
committed before",
-                    e,
-                    committable);
+                    "Unable to commit recovered transaction ({}) because it's 
in invalid state. "
+                            + "Most likely the transaction has been aborted 
for some reason. Please check Kafka logs for more details.",

Review comment:
       ```suggestion
                       "Unable to commit recovered transaction ({}) because 
it's in an invalid state. "
                               + "Most likely the transaction has been aborted 
for some reason. Please check the Kafka logs for more details.",
   ```

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -1052,13 +1040,23 @@ protected void 
recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transac
                 producer = 
initTransactionalProducer(transaction.transactionalId, false);
                 producer.resumeTransaction(transaction.producerId, 
transaction.epoch);
                 producer.commitTransaction();
-            } catch (InvalidTxnStateException | ProducerFencedException ex) {
-                // That means we have committed this transaction before.
+            } catch (InvalidTxnStateException e) {
                 LOG.warn(
-                        "Encountered error {} while recovering transaction {}. 
"
-                                + "Presumably this transaction has been 
already committed before",
-                        ex,
-                        transaction);
+                        "Unable to commit recovered transaction ({}) because 
it's in invalid state. "
+                                + "Most likely the transaction has been 
aborted for some reason. Please check Kafka logs for more details.",
+                        transaction,
+                        e);
+            } catch (ProducerFencedException e) {
+                LOG.warn(
+                        "Unable to commit recovered transaction ({}) because 
its producer is already fenced."
+                                + " This means that you either have a 
different producer with the same '{}' or"
+                                + " recovery took longer than '{}' ({}ms). In 
both cases this most likely signals data loss,"
+                                + " please consult documentation for more 
details.",

Review comment:
       ```suggestion
                                   + " please consult the Flink documentation 
for more details.",
   ```

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -61,13 +62,25 @@ private void commitTransaction(KafkaCommittable 
committable) {
                 new 
FlinkKafkaInternalProducer<>(createKafkaProducerConfig(transactionalId))) {
             producer.resumeTransaction(committable.getProducerId(), 
committable.getEpoch());
             producer.commitTransaction();
-        } catch (InvalidTxnStateException | ProducerFencedException e) {
-            // That means we have committed this transaction before.
+        } catch (InvalidTxnStateException e) {
             LOG.warn(
-                    "Encountered error {} while recovering transaction {}. "
-                            + "Presumably this transaction has been already 
committed before",
-                    e,
-                    committable);
+                    "Unable to commit recovered transaction ({}) because it's 
in invalid state. "
+                            + "Most likely the transaction has been aborted 
for some reason. Please check Kafka logs for more details.",
+                    committable,
+                    e);
+        } catch (ProducerFencedException e) {
+            LOG.warn(
+                    "Unable to commit recovered transaction ({}) because its 
producer is already fenced."
+                            + " This means that you either have a different 
producer with the same '{}' (this is"
+                            + " unlikely with the '{}' as all generated ids 
are unique and shouldn't be reused)"
+                            + " or recovery took longer than '{}' ({}ms). In 
both cases this most likely signals data loss,"
+                            + " please consult documentation for more 
details.",
+                    committable,
+                    ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+                    KafkaSink.class.getSimpleName(),

Review comment:
       Why not just inline "KafkaSink" into the message string?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -1052,13 +1040,23 @@ protected void 
recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transac
                 producer = 
initTransactionalProducer(transaction.transactionalId, false);
                 producer.resumeTransaction(transaction.producerId, 
transaction.epoch);
                 producer.commitTransaction();
-            } catch (InvalidTxnStateException | ProducerFencedException ex) {
-                // That means we have committed this transaction before.
+            } catch (InvalidTxnStateException e) {
                 LOG.warn(
-                        "Encountered error {} while recovering transaction {}. 
"
-                                + "Presumably this transaction has been 
already committed before",
-                        ex,
-                        transaction);
+                        "Unable to commit recovered transaction ({}) because 
it's in invalid state. "
+                                + "Most likely the transaction has been 
aborted for some reason. Please check Kafka logs for more details.",

Review comment:
       ```suggestion
                           "Unable to commit recovered transaction ({}) because 
it's in an invalid state. "
                                   + "Most likely the transaction has been 
aborted for some reason. Please check the Kafka logs for more details.",
   ```

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -61,13 +62,25 @@ private void commitTransaction(KafkaCommittable 
committable) {
                 new 
FlinkKafkaInternalProducer<>(createKafkaProducerConfig(transactionalId))) {
             producer.resumeTransaction(committable.getProducerId(), 
committable.getEpoch());
             producer.commitTransaction();
-        } catch (InvalidTxnStateException | ProducerFencedException e) {
-            // That means we have committed this transaction before.
+        } catch (InvalidTxnStateException e) {
             LOG.warn(
-                    "Encountered error {} while recovering transaction {}. "
-                            + "Presumably this transaction has been already 
committed before",
-                    e,
-                    committable);
+                    "Unable to commit recovered transaction ({}) because it's 
in invalid state. "
+                            + "Most likely the transaction has been aborted 
for some reason. Please check Kafka logs for more details.",
+                    committable,
+                    e);
+        } catch (ProducerFencedException e) {
+            LOG.warn(
+                    "Unable to commit recovered transaction ({}) because its 
producer is already fenced."
+                            + " This means that you either have a different 
producer with the same '{}' (this is"
+                            + " unlikely with the '{}' as all generated ids 
are unique and shouldn't be reused)"
+                            + " or recovery took longer than '{}' ({}ms). In 
both cases this most likely signals data loss,"
+                            + " please consult documentation for more 
details.",

Review comment:
       I assume the Flink docs? (here, it's a bit ambiguous whether you mean 
the Kafka or Flink docs)
   ```suggestion
                               + " please consult the Flink documentation for 
more details.",
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to